Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Saga pattern support in java-sdk #956

Merged
merged 20 commits into from
Jan 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
90e6ba0
first versoin of saga pattern support based on workflow
skyao Oct 24, 2023
221438b
add unit test for SagaConfiguration to improve code coverage
skyao Oct 24, 2023
18c61ac
Merge branch 'master' into saga
artursouza Nov 13, 2023
f0ab98d
save draft version before refactory to not hide saga.registerCompensa…
skyao Nov 16, 2023
551ce5a
remove auto register compensation activity on callActivity()
skyao Nov 16, 2023
ff1b569
update branch
skyao Nov 16, 2023
a9a0712
Merge branch 'master' into saga-in-workflow
skyao Nov 17, 2023
fb9470a
rollback COVEREDRATIO to 80%
skyao Nov 17, 2023
5d5ad6a
improve code implementation accordings to proposal
skyao Nov 21, 2023
f6231ba
Merge branch 'master' into saga-in-workflow
skyao Nov 30, 2023
d5980e5
Merge branch 'dapr:master' into saga-in-workflow
skyao Dec 1, 2023
490e9ff
use ctx.allOf() to do compensation in parallel
skyao Dec 1, 2023
34f9ab3
Merge branch 'saga-in-workflow' of github.com:skyao/java-sdk into sag…
skyao Dec 1, 2023
3db12a4
Merge branch 'master' into saga-in-workflow
mukundansundar Dec 11, 2023
e63996b
Merge branch 'master' into saga-in-workflow
artursouza Jan 5, 2024
c4ade6e
Merge branch 'master' into saga-in-workflow
skyao Jan 9, 2024
f8ca83a
add code to handle ContinueAsNewInterruption exception for saga compe…
skyao Jan 9, 2024
5dbe494
Merge remote-tracking branch 'upstream/master' into saga-in-workflow
skyao Jan 13, 2024
1872ba4
add saga context for saga related method
skyao Jan 13, 2024
59e76f1
fix for checkstyle
skyao Jan 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import com.microsoft.durabletask.TaskCanceledException;
import com.microsoft.durabletask.TaskOptions;
import com.microsoft.durabletask.TaskOrchestrationContext;
import io.dapr.workflows.saga.DaprSagaContextImpl;
import io.dapr.workflows.saga.Saga;
import io.dapr.workflows.saga.SagaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.helpers.NOPLogger;
Expand All @@ -32,6 +35,7 @@
public class DaprWorkflowContextImpl implements WorkflowContext {
private final TaskOrchestrationContext innerContext;
private final Logger logger;
private final Saga saga;

/**
* Constructor for DaprWorkflowContextImpl.
Expand All @@ -51,6 +55,23 @@ public DaprWorkflowContextImpl(TaskOrchestrationContext context) throws IllegalA
* @throws IllegalArgumentException if context or logger is null
*/
public DaprWorkflowContextImpl(TaskOrchestrationContext context, Logger logger) throws IllegalArgumentException {
this(context, logger, null);
}

public DaprWorkflowContextImpl(TaskOrchestrationContext context, Saga saga) throws IllegalArgumentException {
skyao marked this conversation as resolved.
Show resolved Hide resolved
this(context, LoggerFactory.getLogger(WorkflowContext.class), saga);
}

/**
* Constructor for DaprWorkflowContextImpl.
*
* @param context TaskOrchestrationContext
* @param logger Logger
* @param saga saga object, if null, saga is disabled
* @throws IllegalArgumentException if context or logger is null
*/
public DaprWorkflowContextImpl(TaskOrchestrationContext context, Logger logger, Saga saga)
throws IllegalArgumentException {
if (context == null) {
throw new IllegalArgumentException("Context cannot be null");
}
Expand All @@ -60,6 +81,7 @@ public DaprWorkflowContextImpl(TaskOrchestrationContext context, Logger logger)

this.innerContext = context;
this.logger = logger;
this.saga = saga;
}

/**
Expand Down Expand Up @@ -110,26 +132,33 @@ public <V> Task<V> waitForExternalEvent(String name, Duration timeout, Class<V>
}

/**
* Waits for an event to be raised named {@code name} and returns a {@link Task} that completes when the event is
* Waits for an event to be raised named {@code name} and returns a {@link Task}
* that completes when the event is
* received or is canceled when {@code timeout} expires.
*
* <p>See {@link #waitForExternalEvent(String, Duration, Class)} for a full description.
* <p>See {@link #waitForExternalEvent(String, Duration, Class)} for a full
* description.
*
* @param name the case-insensitive name of the event to wait for
* @param timeout the amount of time to wait before canceling the returned {@code Task}
* @return a new {@link Task} that completes when the external event is received or when {@code timeout} expires
* @throws TaskCanceledException if the specified {@code timeout} value expires before the event is received
* @param timeout the amount of time to wait before canceling the returned
* {@code Task}
* @return a new {@link Task} that completes when the external event is received
* or when {@code timeout} expires
* @throws TaskCanceledException if the specified {@code timeout} value expires
* before the event is received
*/
@Override
public <V> Task<Void> waitForExternalEvent(String name, Duration timeout) throws TaskCanceledException {
return this.innerContext.waitForExternalEvent(name, timeout, Void.class);
}

/**
* Waits for an event to be raised named {@code name} and returns a {@link Task} that completes when the event is
* Waits for an event to be raised named {@code name} and returns a {@link Task}
* that completes when the event is
* received.
*
* <p>See {@link #waitForExternalEvent(String, Duration, Class)} for a full description.
* <p>See {@link #waitForExternalEvent(String, Duration, Class)} for a full
* description.
*
* @param name the case-insensitive name of the event to wait for
* @return a new {@link Task} that completes when the external event is received
Expand Down Expand Up @@ -172,7 +201,6 @@ public Task<Void> createTimer(Duration duration) {
return this.innerContext.createTimer(duration);
}


/**
* {@inheritDoc}
*/
Expand All @@ -185,7 +213,7 @@ public <T> T getInput(Class<T> targetType) {
*/
@Override
public <V> Task<V> callSubWorkflow(String name, @Nullable Object input, @Nullable String instanceID,
@Nullable TaskOptions options, Class<V> returnType) {
@Nullable TaskOptions options, Class<V> returnType) {

return this.innerContext.callSubOrchestrator(name, input, instanceID, options, returnType);
}
Expand Down Expand Up @@ -213,4 +241,13 @@ public void continueAsNew(Object input, boolean preserveUnprocessedEvents) {
public UUID newUuid() {
return this.innerContext.newUUID();
}

@Override
public SagaContext getSagaContext() {
if (this.saga == null) {
throw new UnsupportedOperationException("Saga is not enabled");
}

return new DaprSagaContextImpl(this.saga, this);
}
}
51 changes: 48 additions & 3 deletions sdk-workflows/src/main/java/io/dapr/workflows/Workflow.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,16 @@

package io.dapr.workflows;

import com.microsoft.durabletask.interruption.ContinueAsNewInterruption;
import com.microsoft.durabletask.interruption.OrchestratorBlockedException;
import io.dapr.workflows.saga.SagaCompensationException;
import io.dapr.workflows.saga.SagaOption;

/**
* Common interface for workflow implementations.
*/
public abstract class Workflow {
public Workflow(){
public Workflow() {
}

/**
Expand All @@ -30,10 +35,50 @@ public Workflow(){
/**
* Executes the workflow logic.
*
* @param ctx provides access to methods for scheduling durable tasks and getting information about the current
* @param ctx provides access to methods for scheduling durable tasks and
* getting information about the current
* workflow instance.
*/
public void run(WorkflowContext ctx) {
this.create().run(ctx);
WorkflowStub stub = this.create();

if (!this.isSagaEnabled()) {
// saga disabled
stub.run(ctx);
} else {
// saga enabled
try {
stub.run(ctx);
} catch (OrchestratorBlockedException | ContinueAsNewInterruption e) {
throw e;
} catch (SagaCompensationException e) {
// Saga compensation is triggered gracefully but failed in exception
// don't need to trigger compensation again
throw e;
} catch (Exception e) {
try {
ctx.getSagaContext().compensate();
} catch (Exception se) {
se.addSuppressed(e);
throw se;
}

throw e;
}
}
}

public boolean isSagaEnabled() {
return this.getSagaOption() != null;
}

/**
* get saga configuration.
*
* @return saga configuration
*/
public SagaOption getSagaOption() {
// by default, saga is disabled
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.microsoft.durabletask.TaskCanceledException;
import com.microsoft.durabletask.TaskFailedException;
import com.microsoft.durabletask.TaskOptions;
import io.dapr.workflows.saga.SagaContext;
import org.slf4j.Logger;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -530,4 +531,12 @@ default void continueAsNew(Object input) {
default UUID newUuid() {
throw new RuntimeException("No implementation found.");
}

/**
* get saga context.
*
* @return saga context
* @throws UnsupportedOperationException if saga is not enabled.
*/
SagaContext getSagaContext();
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@

package io.dapr.workflows;

import io.dapr.workflows.WorkflowContext;

@FunctionalInterface
public interface WorkflowStub {
void run(WorkflowContext ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.microsoft.durabletask.TaskOrchestrationFactory;
import io.dapr.workflows.DaprWorkflowContextImpl;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.saga.Saga;

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
Expand Down Expand Up @@ -55,7 +56,13 @@
String.format("Unable to instantiate instance of workflow class '%s'", this.name), e
);
}
workflow.run(new DaprWorkflowContextImpl(ctx));

if (workflow.getSagaOption() != null) {
Saga saga = new Saga(workflow.getSagaOption());
workflow.run(new DaprWorkflowContextImpl(ctx, saga));
} else {

Check warning on line 63 in sdk-workflows/src/main/java/io/dapr/workflows/runtime/OrchestratorWrapper.java

View check run for this annotation

Codecov / codecov/patch

sdk-workflows/src/main/java/io/dapr/workflows/runtime/OrchestratorWrapper.java#L61-L63

Added lines #L61 - L63 were not covered by tests
workflow.run(new DaprWorkflowContextImpl(ctx));
}
};

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.workflows.saga;

import com.microsoft.durabletask.TaskOptions;

/**
* Information for a compensation activity.
*/
class CompensatationInformation {
private final String compensatationActivityClassName;
private final Object compensatationActivityInput;
private final TaskOptions taskOptions;

/**
* Constructor for a compensation information.
*
* @param compensatationActivityClassName Class name of the activity to do
* compensatation.
* @param compensatationActivityInput Input of the activity to do
* compensatation.
* @param taskOptions task options to set retry strategy
*/
public CompensatationInformation(String compensatationActivityClassName,
Object compensatationActivityInput, TaskOptions taskOptions) {
this.compensatationActivityClassName = compensatationActivityClassName;
this.compensatationActivityInput = compensatationActivityInput;
this.taskOptions = taskOptions;
}

/**
* Gets the class name of the activity.
*
* @return the class name of the activity.
*/
public String getCompensatationActivityClassName() {
return compensatationActivityClassName;
}

/**
* Gets the input of the activity.
*
* @return the input of the activity.
*/
public Object getCompensatationActivityInput() {
return compensatationActivityInput;
}

/**
* get task options.
*
* @return task options, null if not set
*/
public TaskOptions getTaskOptions() {
return taskOptions;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.workflows.saga;

import io.dapr.workflows.WorkflowContext;

/**
* Dapr Saga Context implementation.
*/
public class DaprSagaContextImpl implements SagaContext {

private final Saga saga;
private final WorkflowContext workflowContext;

/**
* Constructor to build up instance.
*
* @param saga Saga instance.
* @param workflowContext Workflow context.
* @throws IllegalArgumentException if saga or workflowContext is null.
*/
public DaprSagaContextImpl(Saga saga, WorkflowContext workflowContext) {
if (saga == null) {
throw new IllegalArgumentException("Saga should not be null");
}
if (workflowContext == null) {
throw new IllegalArgumentException("workflowContext should not be null");
}

this.saga = saga;
this.workflowContext = workflowContext;
}

@Override
public void registerCompensation(String activityClassName, Object activityInput) {
this.saga.registerCompensation(activityClassName, activityInput);
}

@Override
public void compensate() {
this.saga.compensate(workflowContext);
}
}
Loading
Loading