Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StateMachine rewrite #172

Merged
merged 31 commits into from
Aug 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
13c2b0f
StateMachine refactoring
mfateev Aug 7, 2020
0c08449
PR feedback and broken unit test
mfateev Aug 8, 2020
250f70d
Fixed workflow task failure reporting
mfateev Aug 8, 2020
68db6c6
WorkflowTaskResult now is created through a Builder
mfateev Aug 8, 2020
1b1254b
StateMachine nomenclature cleanup
mfateev Aug 8, 2020
8243071
Merge branch 'master' of github.com:temporalio/temporal-java-sdk into…
mfateev Aug 9, 2020
4df71d6
initial command refactoring
mfateev Aug 9, 2020
e1e3fa6
Added ActivityStateMachineTest and fixed cancellation failure
mfateev Aug 9, 2020
190f3f1
Added more activity state machine tests
mfateev Aug 9, 2020
f6ed96a
Created StateMachineDefinition
mfateev Aug 10, 2020
f94e9ec
missing final
mfateev Aug 10, 2020
ef3e870
Added state diagram coverage diagram
mfateev Aug 10, 2020
eb1dfd9
100% coverage of ActivityStateMachine
mfateev Aug 10, 2020
24e6d7f
PR comments
mfateev Aug 10, 2020
49713bd
UnsupportedVersion exception. Cleanup
mfateev Aug 10, 2020
8a22826
Updated order of state transition in activity state machine
mfateev Aug 11, 2020
a8ba786
Sort of the PlantUML diagrams
mfateev Aug 11, 2020
5397c17
PR feedback
mfateev Aug 11, 2020
731f902
Added coverage to LocalActivityStateMachineTest
mfateev Aug 11, 2020
088cbcf
Added coverage to versionStateMachineTest
mfateev Aug 11, 2020
0b7bc47
Cancelled->canceled
mfateev Aug 11, 2020
dd642f0
Simplified local activity handling
mfateev Aug 12, 2020
1cc7b0c
allOf, anyOf and iterator
mfateev Aug 12, 2020
abf241c
event loop
mfateev Aug 12, 2020
65ffd00
Refactored ReplayWorkflowExecutor
mfateev Aug 13, 2020
919cad6
Renamed StatefulTaskHandler to WorkflowRunTaskHandler
mfateev Aug 13, 2020
d6918a0
Fixed local activity retries
mfateev Aug 13, 2020
92a3e36
PR comments
mfateev Aug 13, 2020
0a4dc38
PR feedback
mfateev Aug 13, 2020
81a5fb3
Fixed Saga defaults
mfateev Aug 13, 2020
4ddab73
Fixed sticky task and cache invalidation
mfateev Aug 14, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ plugins {
id 'maven-publish'
id 'signing'
id 'de.marcphilipp.nexus-publish' version '0.4.0'
id 'name.remal.check-updates' version '1.0.203'
id 'name.remal.check-updates' version '1.0.209'
}

repositories {
Expand Down Expand Up @@ -98,15 +98,15 @@ dependencies {
api group: 'com.uber.m3', name: 'tally-core', version: '0.6.1'
api group: 'io.micrometer', name: 'micrometer-core', version: '1.5.2'
api group: 'org.slf4j', name: 'slf4j-api', version: '1.7.30'
api 'io.grpc:grpc-protobuf:1.30.2'
api 'io.grpc:grpc-stub:1.30.2'
api 'io.grpc:grpc-protobuf:1.31.1'
api 'io.grpc:grpc-stub:1.31.1'

implementation group: 'com.google.guava', name: 'guava', version: '29.0-jre'
implementation group: 'com.cronutils', name: 'cron-utils', version: '9.1.0'
implementation 'io.grpc:grpc-netty-shaded:1.30.2'
implementation group: 'com.google.protobuf', name: 'protobuf-java-util', version: '3.12.2'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.11.1'
implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: '2.11.1'
implementation group: 'com.cronutils', name: 'cron-utils', version: '9.1.1'
implementation 'io.grpc:grpc-netty-shaded:1.31.1'
implementation group: 'com.google.protobuf', name: 'protobuf-java-util', version: '3.12.4'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.11.2'
implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: '2.11.2'

testImplementation group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3'
testImplementation group: 'com.googlecode.junit-toolbox', name: 'junit-toolbox', version: '2.4'
Expand All @@ -120,7 +120,7 @@ configurations.all {

license {
header rootProject.file('license-header.txt')
exclude 'io/temporal/proto/**.java' // generated code
exclude '**/*.puml'
}

task initSubmodules(type: Exec) {
Expand All @@ -143,11 +143,11 @@ jar {

protobuf {
protoc {
artifact = 'com.google.protobuf:protoc:3.12.3'
artifact = 'com.google.protobuf:protoc:3.12.4'
}
plugins {
grpc {
artifact = 'io.grpc:protoc-gen-grpc-java:1.30.2'
artifact = 'io.grpc:protoc-gen-grpc-java:1.31.1'
}
}
generateProtoTasks {
Expand All @@ -170,8 +170,8 @@ protobuf {

idea {
module {
sourceDirs += file("$buildDir/generated/main/java");
sourceDirs += file("$buildDir/generated/main/grpc");
sourceDirs += file("$buildDir/generated/main/java")
sourceDirs += file("$buildDir/generated/main/grpc")
}
}

Expand Down Expand Up @@ -327,15 +327,15 @@ allprojects {
// add a testlistener to all tasks of type Test
tasks.withType(Test) {
afterTest { TestDescriptor descriptor, TestResult result ->
if(result.resultType == org.gradle.api.tasks.testing.TestResult.ResultType.FAILURE){
if (result.resultType == org.gradle.api.tasks.testing.TestResult.ResultType.FAILURE) {
failedTests << ["${descriptor.className}::${descriptor.name}"]
}
}
}

// print out tracked failed tests when the build has finished
gradle.buildFinished {
if(!failedTests.empty){
if (!failedTests.empty) {
println "Failed tests for ${project.name}:"
failedTests.each { failedTest ->
println failedTest
Expand Down
Binary file modified gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.5.1-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-6.6-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
2 changes: 2 additions & 0 deletions gradlew
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ esac

CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar


# Determine the Java command to use to start the JVM.
if [ -n "$JAVA_HOME" ] ; then
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
Expand Down Expand Up @@ -129,6 +130,7 @@ fi
if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then
APP_HOME=`cygpath --path --mixed "$APP_HOME"`
CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`

JAVACMD=`cygpath --unix "$JAVACMD"`

# We build the pattern for arguments to be converted via cygpath
Expand Down
1 change: 1 addition & 0 deletions gradlew.bat
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ set CMD_LINE_ARGS=%*

set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar


@rem Execute Gradle
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS%

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

/**
* Defines behaviour of the parent workflow when {@link CancellationScope} that wraps child workflow
* execution request is cancelled. The result of the cancellation independently of the type is a
* execution request is canceled. The result of the cancellation independently of the type is a
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have the same mess on GoSDK and server side.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* {@link CanceledFailure} thrown from the child workflow method.
*/
public enum ActivityCancellationType {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/temporal/activity/ActivityOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public ActivityOptions validateAndBuildWithDefaults() {
taskQueue,
retryOptions,
contextPropagators,
cancellationType);
cancellationType == null ? ActivityCancellationType.TRY_CANCEL : cancellationType);
mastermanu marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
28 changes: 25 additions & 3 deletions src/main/java/io/temporal/activity/LocalActivityOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public static LocalActivityOptions getDefaultInstance() {

public static final class Builder {
private Duration scheduleToCloseTimeout;
private Duration localRetryThreshold;
private Duration startToCloseTimeout;
private RetryOptions retryOptions;

Expand All @@ -57,6 +58,7 @@ private Builder(LocalActivityOptions options) {
return;
}
this.scheduleToCloseTimeout = options.getScheduleToCloseTimeout();
this.localRetryThreshold = options.getLocalRetryThreshold();
this.startToCloseTimeout = options.getStartToCloseTimeout();
this.retryOptions = options.retryOptions;
}
Expand All @@ -67,6 +69,15 @@ public Builder setScheduleToCloseTimeout(Duration scheduleToCloseTimeout) {
return this;
}

/**
* Maximum time to retry locally keeping workflow task open through heartbeat. Default is 6
* workflow task timeout.
*/
public Builder setLocalRetryThreshold(Duration localRetryThreshold) {
this.localRetryThreshold = localRetryThreshold;
return this;
}

public Builder setStartToCloseTimeout(Duration startToCloseTimeout) {
this.startToCloseTimeout = startToCloseTimeout;
return this;
Expand All @@ -93,24 +104,31 @@ public Builder setMethodRetry(MethodRetry r) {
}

public LocalActivityOptions build() {
return new LocalActivityOptions(scheduleToCloseTimeout, startToCloseTimeout, retryOptions);
return new LocalActivityOptions(
startToCloseTimeout, localRetryThreshold, scheduleToCloseTimeout, retryOptions);
}

public LocalActivityOptions validateAndBuildWithDefaults() {
RetryOptions ro = null;
if (retryOptions != null) {
ro = RetryOptions.newBuilder(retryOptions).validateBuildWithDefaults();
}
return new LocalActivityOptions(scheduleToCloseTimeout, startToCloseTimeout, ro);
return new LocalActivityOptions(
startToCloseTimeout, localRetryThreshold, scheduleToCloseTimeout, ro);
}
}

private final Duration scheduleToCloseTimeout;
private final Duration localRetryThreshold;
private final Duration startToCloseTimeout;
private final RetryOptions retryOptions;

private LocalActivityOptions(
Duration scheduleToCloseTimeout, Duration startToCloseTimeout, RetryOptions retryOptions) {
Duration startToCloseTimeout,
Duration localRetryThreshold,
Duration scheduleToCloseTimeout,
RetryOptions retryOptions) {
this.localRetryThreshold = localRetryThreshold;
this.scheduleToCloseTimeout = scheduleToCloseTimeout;
this.startToCloseTimeout = startToCloseTimeout;
this.retryOptions = retryOptions;
Expand All @@ -120,6 +138,10 @@ public Duration getScheduleToCloseTimeout() {
return scheduleToCloseTimeout;
}

public Duration getLocalRetryThreshold() {
return localRetryThreshold;
}

public Duration getStartToCloseTimeout() {
return startToCloseTimeout;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
* Usually indicates that activity was already completed (duplicated request to complete) or timed
* out or workflow is closed.
*/
public final class ActivityCancelledException extends ActivityCompletionException {
public final class ActivityCanceledException extends ActivityCompletionException {

public ActivityCancelledException(ActivityInfo info) {
public ActivityCanceledException(ActivityInfo info) {
super(info);
}

public ActivityCancelledException() {
public ActivityCanceledException() {
super();
}
}
12 changes: 6 additions & 6 deletions src/main/java/io/temporal/client/WorkflowOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public Builder setWorkflowId(String workflowId) {
*
* <ul>
* AllowDuplicateFailedOnly is a default value. It means that workflow can start if
* previous run failed or was cancelled or terminated.
* previous run failed or was canceled or terminated.
* </ul>
*
* <ul>
Expand Down Expand Up @@ -263,15 +263,15 @@ public WorkflowOptions validateBuildWithDefaults() {

private final String taskQueue;

private RetryOptions retryOptions;
private final RetryOptions retryOptions;

private String cronSchedule;
private final String cronSchedule;

private Map<String, Object> memo;
private final Map<String, Object> memo;

private Map<String, Object> searchAttributes;
private final Map<String, Object> searchAttributes;

private List<ContextPropagator> contextPropagators;
private final List<ContextPropagator> contextPropagators;

private WorkflowOptions(
String workflowId,
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/temporal/client/WorkflowStub.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ <R> CompletableFuture<R> getResultAsync(
* Request cancellation of a workflow execution.
*
* <p>Cancellation cancels {@link io.temporal.workflow.CancellationScope} that wraps the main
* workflow method. Note that workflow can take long time to get cancelled or even completely
* workflow method. Note that workflow can take long time to get canceled or even completely
* ignore the cancellation request.
*/
void cancel();
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/temporal/common/CronSchedule.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* supplied, and the workflow failed or timeout, the workflow will be retried based on the retry
* policy. While the workflow is retrying, it won't schedule its next run. If next schedule is due
* while workflow is running (or retrying), then it will skip that schedule. Cron workflow will not
* stop until it is terminated or cancelled.
* stop until it is terminated or canceled.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
public final class EncodedValues implements Values {
private Optional<Payloads> payloads;
private DataConverter converter;
private Object[] values;
private final Object[] values;

public EncodedValues(Optional<Payloads> payloads, DataConverter converter) {
this.payloads = Objects.requireNonNull(payloads);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,6 @@ void registerSignal(
void upsertSearchAttributes(Map<String, Object> searchAttributes);

Object newThread(Runnable runnable, boolean detached, String name);

long currentTimeMillis();
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,4 +166,9 @@ public void upsertSearchAttributes(Map<String, Object> searchAttributes) {
public Object newThread(Runnable runnable, boolean detached, String name) {
return next.newThread(runnable, detached, name);
}

@Override
public long currentTimeMillis() {
return next.currentTimeMillis();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void setDataConverter(DataConverter converter) {
}

private static String getMessage(String message, String type, boolean nonRetryable) {
return (Strings.isNullOrEmpty(message) ? "" : "message='" + message + "\', ")
return (Strings.isNullOrEmpty(message) ? "" : "message='" + message + "', ")
+ "type='"
+ type
+ '\''
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/temporal/failure/CanceledFailure.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@
import io.temporal.common.converter.DataConverter;
import io.temporal.common.converter.EncodedValues;
import io.temporal.common.converter.Values;
import java.util.Objects;

public final class CanceledFailure extends TemporalFailure {
private final Values details;

public CanceledFailure(String message, Values details, Throwable cause) {
super(message, message, cause);
this.details = details;
this.details = Objects.requireNonNull(details);
}

public CanceledFailure(String message, Object details) {
Expand Down
13 changes: 7 additions & 6 deletions src/main/java/io/temporal/failure/FailureConverter.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import io.temporal.api.failure.v1.ServerFailureInfo;
import io.temporal.api.failure.v1.TerminatedFailureInfo;
import io.temporal.api.failure.v1.TimeoutFailureInfo;
import io.temporal.client.ActivityCancelledException;
import io.temporal.client.ActivityCanceledException;
import io.temporal.common.converter.DataConverter;
import io.temporal.common.converter.EncodedValues;
import io.temporal.internal.common.CheckedExceptionWrapper;
Expand Down Expand Up @@ -66,11 +66,11 @@ public class FailureConverter {

private static final Pattern TRACE_ELEMENT_PATTERN = Pattern.compile(TRACE_ELEMENT_REGEXP);

public static Exception failureToException(Failure failure, DataConverter dataConverter) {
public static RuntimeException failureToException(Failure failure, DataConverter dataConverter) {
if (failure == null) {
return null;
}
Exception result = failureToExceptionImpl(failure, dataConverter);
RuntimeException result = failureToExceptionImpl(failure, dataConverter);
if (result instanceof TemporalFailure) {
((TemporalFailure) result).setFailure(failure);
}
Expand All @@ -81,8 +81,9 @@ public static Exception failureToException(Failure failure, DataConverter dataCo
return result;
}

private static Exception failureToExceptionImpl(Failure failure, DataConverter dataConverter) {
Exception cause =
private static RuntimeException failureToExceptionImpl(
Failure failure, DataConverter dataConverter) {
RuntimeException cause =
failure.hasCause() ? failureToException(failure.getCause(), dataConverter) : null;
switch (failure.getFailureInfoCase()) {
case APPLICATION_FAILURE_INFO:
Expand Down Expand Up @@ -257,7 +258,7 @@ public static Failure exceptionToFailureNoUnwrapping(Throwable e) {
.setWorkflowType(WorkflowType.newBuilder().setName(ce.getWorkflowType()))
.setWorkflowExecution(ce.getExecution());
failure.setChildWorkflowExecutionFailureInfo(info);
} else if (e instanceof ActivityCancelledException) {
} else if (e instanceof ActivityCanceledException) {
CanceledFailureInfo.Builder info = CanceledFailureInfo.newBuilder();
failure.setCanceledFailureInfo(info);
} else {
Expand Down
Loading