Skip to content

Commit

Permalink
Support persistence locking in RPC (#206)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored Aug 25, 2023
1 parent 33cb531 commit 7835a7c
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 8 deletions.
4 changes: 2 additions & 2 deletions script/.env
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ CASSANDRA_VERSION=3.11.9
ELASTICSEARCH_VERSION=7.16.2
MYSQL_VERSION=8
POSTGRESQL_VERSION=13
TEMPORAL_VERSION=1.20.1
TEMPORAL_UI_VERSION=2.10.1
TEMPORAL_VERSION=1.21.4
TEMPORAL_UI_VERSION=2.18.0
3 changes: 0 additions & 3 deletions script/dynamicconfig/development-cass.yaml

This file was deleted.

2 changes: 2 additions & 0 deletions script/dynamicconfig/development-sql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ limit.maxIDLength:
system.forceSearchAttributesCacheRefreshOnRead:
- value: true # Dev setup only. Please don't turn this on in production.
constraints: {}
frontend.enableUpdateWorkflowExecution:
- value: true
4 changes: 4 additions & 0 deletions src/main/java/io/iworkflow/core/RPC.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@
// used when dataAttributesLoadingType is PARTIAL_WITHOUT_LOCKING
String[] dataAttributesPartialLoadingKeys() default {};

String[] dataAttributesLockingKeys() default {};

PersistenceLoadingType searchAttributesLoadingType() default PersistenceLoadingType.ALL_WITHOUT_LOCKING;

// used when searchAttributesPartialLoadingKeys is PARTIAL_WITHOUT_LOCKING
String[] searchAttributesPartialLoadingKeys() default {};

String[] searchAttributesLockingKeys() default {};

/**
* Only used when workflow has enabled {@link PersistenceOptions} CachingPersistenceByMemo
* By default, it's false for high throughput support
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/io/iworkflow/core/RpcInvocationHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ public Object intercept(@AllArguments Object[] allArguments,
final Object output = unregisteredClient.invokeRpc(outputType, input, workflowId, workflowRunId, method.getName(), rpcAnno.timeoutSeconds(),
new PersistenceLoadingPolicy()
.persistenceLoadingType(rpcAnno.dataAttributesLoadingType())
.partialLoadingKeys(Arrays.asList(rpcAnno.dataAttributesPartialLoadingKeys())),
.partialLoadingKeys(Arrays.asList(rpcAnno.dataAttributesPartialLoadingKeys()))
.lockingKeys(Arrays.asList(rpcAnno.dataAttributesLockingKeys())),
new PersistenceLoadingPolicy()
.persistenceLoadingType(rpcAnno.searchAttributesLoadingType())
.lockingKeys(Arrays.asList(rpcAnno.searchAttributesLockingKeys()))
.partialLoadingKeys(Arrays.asList(rpcAnno.searchAttributesPartialLoadingKeys())),
useMemo,
searchAttributeKeyAndTypes
Expand Down
63 changes: 63 additions & 0 deletions src/test/java/io/iworkflow/integ/RpcTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import io.iworkflow.core.ClientOptions;
import io.iworkflow.core.ClientSideException;
import io.iworkflow.core.ImmutableStopWorkflowOptions;
import io.iworkflow.core.ImmutableWorkflowOptions;
import io.iworkflow.gen.models.ErrorResponse;
import io.iworkflow.gen.models.WorkflowConfig;
import io.iworkflow.gen.models.WorkflowStopType;
import io.iworkflow.integ.persistence.BasicPersistenceWorkflow;
import io.iworkflow.integ.rpc.NoStateWorkflow;
Expand All @@ -17,9 +19,13 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import static io.iworkflow.integ.persistence.BasicPersistenceWorkflow.TEST_SEARCH_ATTRIBUTE_INT;
import static io.iworkflow.integ.persistence.BasicPersistenceWorkflow.TEST_SEARCH_ATTRIBUTE_KEYWORD;
Expand All @@ -37,6 +43,63 @@ public void setup() throws ExecutionException, InterruptedException {
TestSingletonWorkerService.startWorkerIfNotUp();
}

@Test
public void testRPCLocking() throws InterruptedException, ExecutionException {
final Client client = new Client(WorkflowRegistry.registry, ClientOptions.localDefault);
final String wfId = "testRPCLocking" + System.currentTimeMillis() / 1000;
client.startWorkflow(
NoStateWorkflow.class, wfId, 1000, 999,
ImmutableWorkflowOptions.builder()
.workflowConfigOverride(
new WorkflowConfig()
.continueAsNewThreshold(1)
)
.build());

final NoStateWorkflow rpcStub = client.newRpcStub(NoStateWorkflow.class, wfId, "");

ExecutorService executor = Executors.newFixedThreadPool(10);
final ArrayList<Future<String>> futures = new ArrayList<>();
int total = 1000;
for (int i = 0; i < total; i++) {

final Future<String> future = executor.submit(() -> {
try {
return client.invokeRPC(rpcStub::increaseCounter);
} catch (ClientSideException e) {
if (e.getStatusCode() != 450) {
throw e;
}
}
return "fail";
}
);
futures.add(future);
}

int succ = 0;
for (int i = 0; i < total; i++) {
;
try {
final String done = futures.get(i).get();
if (done.equals("done")) {
succ++;
}
} catch (Exception ignored) {
}
}

Assertions.assertTrue(succ > 0);
Assertions.assertEquals(succ, client.invokeRPC(rpcStub::getCounter));

executor.shutdown();

// TODO make sure continue as new is happening when no state is executed
// https://github.com/indeedeng/iwf/issues/339

client.stopWorkflow(wfId, null);
}

@Test
public void testRPCWorkflowFunc1() throws InterruptedException {
final Client client = new Client(WorkflowRegistry.registry, ClientOptions.localDefault);
Expand Down
45 changes: 43 additions & 2 deletions src/test/java/io/iworkflow/integ/rpc/NoStateWorkflow.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,64 @@
import io.iworkflow.core.ObjectWorkflow;
import io.iworkflow.core.RPC;
import io.iworkflow.core.communication.Communication;
import io.iworkflow.core.persistence.DataAttributeDef;
import io.iworkflow.core.persistence.Persistence;
import io.iworkflow.core.persistence.PersistenceFieldDef;
import io.iworkflow.gen.models.PersistenceLoadingType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.List;

import static io.iworkflow.integ.RpcTest.RPC_OUTPUT;

@Component
public class NoStateWorkflow implements ObjectWorkflow {

public static final String DA_COUNTER = "counter";
private RpcWorkflow rpcWorkflow;
private NoStartStateWorkflow noStartStateWorkflow;

@Autowired
public NoStateWorkflow(RpcWorkflow rpcWorkflow, NoStartStateWorkflow noStartStateWorkflow ){
this.rpcWorkflow=rpcWorkflow;
public NoStateWorkflow(RpcWorkflow rpcWorkflow, NoStartStateWorkflow noStartStateWorkflow) {
this.rpcWorkflow = rpcWorkflow;
this.noStartStateWorkflow = noStartStateWorkflow;
}

@Override
public List<PersistenceFieldDef> getPersistenceSchema() {
return Arrays.asList(
DataAttributeDef.create(Integer.class, DA_COUNTER)
);
}

@RPC(
dataAttributesLoadingType = PersistenceLoadingType.PARTIAL_WITH_EXCLUSIVE_LOCK,
dataAttributesPartialLoadingKeys = {DA_COUNTER},
dataAttributesLockingKeys = {DA_COUNTER}
)
public String increaseCounter(Context context, Persistence persistence, Communication communication) {
Integer current = persistence.getDataAttribute(DA_COUNTER, Integer.class);
if (current == null) {
current = 0;
}
current++;
persistence.setDataAttribute(DA_COUNTER, current);
return "done";
}

@RPC
public String testWrite(Context context, Persistence persistence, Communication communication) {
persistence.setDataAttribute(DA_COUNTER, 123);
return "done";
}

@RPC
public Integer getCounter(Context context, Persistence persistence, Communication communication) {
return persistence.getDataAttribute(DA_COUNTER, Integer.class);
}

@RPC
public Long testRpcFunc1(Context context, String input, Persistence persistence, Communication communication) {
if (context.getWorkflowId().isEmpty() || context.getWorkflowRunId().isEmpty()) {
Expand Down

0 comments on commit 7835a7c

Please sign in to comment.