diff --git a/script/.env b/script/.env index d41fe7ae..6514696d 100644 --- a/script/.env +++ b/script/.env @@ -2,5 +2,5 @@ CASSANDRA_VERSION=3.11.9 ELASTICSEARCH_VERSION=7.16.2 MYSQL_VERSION=8 POSTGRESQL_VERSION=13 -TEMPORAL_VERSION=1.20.1 -TEMPORAL_UI_VERSION=2.10.1 \ No newline at end of file +TEMPORAL_VERSION=1.21.4 +TEMPORAL_UI_VERSION=2.18.0 \ No newline at end of file diff --git a/script/dynamicconfig/development-cass.yaml b/script/dynamicconfig/development-cass.yaml deleted file mode 100644 index 4b916163..00000000 --- a/script/dynamicconfig/development-cass.yaml +++ /dev/null @@ -1,3 +0,0 @@ -system.forceSearchAttributesCacheRefreshOnRead: - - value: true # Dev setup only. Please don't turn this on in production. - constraints: {} diff --git a/script/dynamicconfig/development-sql.yaml b/script/dynamicconfig/development-sql.yaml index 8862dfad..4d584e68 100644 --- a/script/dynamicconfig/development-sql.yaml +++ b/script/dynamicconfig/development-sql.yaml @@ -4,3 +4,5 @@ limit.maxIDLength: system.forceSearchAttributesCacheRefreshOnRead: - value: true # Dev setup only. Please don't turn this on in production. constraints: {} +frontend.enableUpdateWorkflowExecution: + - value: true \ No newline at end of file diff --git a/src/main/java/io/iworkflow/core/RPC.java b/src/main/java/io/iworkflow/core/RPC.java index f020eb37..c85da78f 100644 --- a/src/main/java/io/iworkflow/core/RPC.java +++ b/src/main/java/io/iworkflow/core/RPC.java @@ -25,11 +25,15 @@ // used when dataAttributesLoadingType is PARTIAL_WITHOUT_LOCKING String[] dataAttributesPartialLoadingKeys() default {}; + String[] dataAttributesLockingKeys() default {}; + PersistenceLoadingType searchAttributesLoadingType() default PersistenceLoadingType.ALL_WITHOUT_LOCKING; // used when searchAttributesPartialLoadingKeys is PARTIAL_WITHOUT_LOCKING String[] searchAttributesPartialLoadingKeys() default {}; + String[] searchAttributesLockingKeys() default {}; + /** * Only used when workflow has enabled {@link PersistenceOptions} CachingPersistenceByMemo * By default, it's false for high throughput support diff --git a/src/main/java/io/iworkflow/core/RpcInvocationHandler.java b/src/main/java/io/iworkflow/core/RpcInvocationHandler.java index ffde963a..50c8e5ae 100644 --- a/src/main/java/io/iworkflow/core/RpcInvocationHandler.java +++ b/src/main/java/io/iworkflow/core/RpcInvocationHandler.java @@ -56,9 +56,11 @@ public Object intercept(@AllArguments Object[] allArguments, final Object output = unregisteredClient.invokeRpc(outputType, input, workflowId, workflowRunId, method.getName(), rpcAnno.timeoutSeconds(), new PersistenceLoadingPolicy() .persistenceLoadingType(rpcAnno.dataAttributesLoadingType()) - .partialLoadingKeys(Arrays.asList(rpcAnno.dataAttributesPartialLoadingKeys())), + .partialLoadingKeys(Arrays.asList(rpcAnno.dataAttributesPartialLoadingKeys())) + .lockingKeys(Arrays.asList(rpcAnno.dataAttributesLockingKeys())), new PersistenceLoadingPolicy() .persistenceLoadingType(rpcAnno.searchAttributesLoadingType()) + .lockingKeys(Arrays.asList(rpcAnno.searchAttributesLockingKeys())) .partialLoadingKeys(Arrays.asList(rpcAnno.searchAttributesPartialLoadingKeys())), useMemo, searchAttributeKeyAndTypes diff --git a/src/test/java/io/iworkflow/integ/RpcTest.java b/src/test/java/io/iworkflow/integ/RpcTest.java index 73e91cf7..831323e1 100644 --- a/src/test/java/io/iworkflow/integ/RpcTest.java +++ b/src/test/java/io/iworkflow/integ/RpcTest.java @@ -5,7 +5,9 @@ import io.iworkflow.core.ClientOptions; import io.iworkflow.core.ClientSideException; import io.iworkflow.core.ImmutableStopWorkflowOptions; +import io.iworkflow.core.ImmutableWorkflowOptions; import io.iworkflow.gen.models.ErrorResponse; +import io.iworkflow.gen.models.WorkflowConfig; import io.iworkflow.gen.models.WorkflowStopType; import io.iworkflow.integ.persistence.BasicPersistenceWorkflow; import io.iworkflow.integ.rpc.NoStateWorkflow; @@ -17,9 +19,13 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.util.ArrayList; import java.util.Arrays; import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import static io.iworkflow.integ.persistence.BasicPersistenceWorkflow.TEST_SEARCH_ATTRIBUTE_INT; import static io.iworkflow.integ.persistence.BasicPersistenceWorkflow.TEST_SEARCH_ATTRIBUTE_KEYWORD; @@ -37,6 +43,63 @@ public void setup() throws ExecutionException, InterruptedException { TestSingletonWorkerService.startWorkerIfNotUp(); } + @Test + public void testRPCLocking() throws InterruptedException, ExecutionException { + final Client client = new Client(WorkflowRegistry.registry, ClientOptions.localDefault); + final String wfId = "testRPCLocking" + System.currentTimeMillis() / 1000; + client.startWorkflow( + NoStateWorkflow.class, wfId, 1000, 999, + ImmutableWorkflowOptions.builder() + .workflowConfigOverride( + new WorkflowConfig() + .continueAsNewThreshold(1) + ) + .build()); + + final NoStateWorkflow rpcStub = client.newRpcStub(NoStateWorkflow.class, wfId, ""); + + ExecutorService executor = Executors.newFixedThreadPool(10); + final ArrayList> futures = new ArrayList<>(); + int total = 1000; + for (int i = 0; i < total; i++) { + + final Future future = executor.submit(() -> { + try { + return client.invokeRPC(rpcStub::increaseCounter); + } catch (ClientSideException e) { + if (e.getStatusCode() != 450) { + throw e; + } + } + return "fail"; + } + ); + futures.add(future); + } + + int succ = 0; + for (int i = 0; i < total; i++) { + ; + try { + final String done = futures.get(i).get(); + if (done.equals("done")) { + succ++; + } + } catch (Exception ignored) { + } + } + + Assertions.assertTrue(succ > 0); + Assertions.assertEquals(succ, client.invokeRPC(rpcStub::getCounter)); + + executor.shutdown(); + + // TODO make sure continue as new is happening when no state is executed + // https://github.com/indeedeng/iwf/issues/339 + + client.stopWorkflow(wfId, null); + } + @Test public void testRPCWorkflowFunc1() throws InterruptedException { final Client client = new Client(WorkflowRegistry.registry, ClientOptions.localDefault); diff --git a/src/test/java/io/iworkflow/integ/rpc/NoStateWorkflow.java b/src/test/java/io/iworkflow/integ/rpc/NoStateWorkflow.java index 40ece79f..b2de6940 100644 --- a/src/test/java/io/iworkflow/integ/rpc/NoStateWorkflow.java +++ b/src/test/java/io/iworkflow/integ/rpc/NoStateWorkflow.java @@ -4,23 +4,64 @@ import io.iworkflow.core.ObjectWorkflow; import io.iworkflow.core.RPC; import io.iworkflow.core.communication.Communication; +import io.iworkflow.core.persistence.DataAttributeDef; import io.iworkflow.core.persistence.Persistence; +import io.iworkflow.core.persistence.PersistenceFieldDef; +import io.iworkflow.gen.models.PersistenceLoadingType; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import java.util.Arrays; +import java.util.List; + import static io.iworkflow.integ.RpcTest.RPC_OUTPUT; @Component public class NoStateWorkflow implements ObjectWorkflow { + public static final String DA_COUNTER = "counter"; private RpcWorkflow rpcWorkflow; private NoStartStateWorkflow noStartStateWorkflow; @Autowired - public NoStateWorkflow(RpcWorkflow rpcWorkflow, NoStartStateWorkflow noStartStateWorkflow ){ - this.rpcWorkflow=rpcWorkflow; + public NoStateWorkflow(RpcWorkflow rpcWorkflow, NoStartStateWorkflow noStartStateWorkflow) { + this.rpcWorkflow = rpcWorkflow; this.noStartStateWorkflow = noStartStateWorkflow; } + + @Override + public List getPersistenceSchema() { + return Arrays.asList( + DataAttributeDef.create(Integer.class, DA_COUNTER) + ); + } + + @RPC( + dataAttributesLoadingType = PersistenceLoadingType.PARTIAL_WITH_EXCLUSIVE_LOCK, + dataAttributesPartialLoadingKeys = {DA_COUNTER}, + dataAttributesLockingKeys = {DA_COUNTER} + ) + public String increaseCounter(Context context, Persistence persistence, Communication communication) { + Integer current = persistence.getDataAttribute(DA_COUNTER, Integer.class); + if (current == null) { + current = 0; + } + current++; + persistence.setDataAttribute(DA_COUNTER, current); + return "done"; + } + + @RPC + public String testWrite(Context context, Persistence persistence, Communication communication) { + persistence.setDataAttribute(DA_COUNTER, 123); + return "done"; + } + + @RPC + public Integer getCounter(Context context, Persistence persistence, Communication communication) { + return persistence.getDataAttribute(DA_COUNTER, Integer.class); + } + @RPC public Long testRpcFunc1(Context context, String input, Persistence persistence, Communication communication) { if (context.getWorkflowId().isEmpty() || context.getWorkflowRunId().isEmpty()) {