Skip to content

Commit

Permalink
Improve error handling and refactor to use new names/paths from IDL (#43
Browse files Browse the repository at this point in the history
)
  • Loading branch information
longquanzheng authored Oct 17, 2022
1 parent 31594a0 commit 8fa6a3c
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 51 deletions.
2 changes: 1 addition & 1 deletion iwf-idl
75 changes: 42 additions & 33 deletions src/main/java/io/github/cadenceoss/iwf/core/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class Client {
private final Registry registry;
Expand Down Expand Up @@ -42,7 +43,7 @@ public String StartWorkflow(
final String wfType = workflowClass.getSimpleName();
final StateDef stateDef = registry.getWorkflowState(wfType, startStateId);
if (stateDef == null || !stateDef.getCanStartWorkflow()) {
throw new RuntimeException("invalid start stateId " + startStateId);
throw new IllegalArgumentException("invalid start stateId " + startStateId);
}

WorkflowStartResponse workflowStartResponse = defaultApi.apiV1WorkflowStartPost(new WorkflowStartRequest()
Expand All @@ -64,11 +65,11 @@ public String StartWorkflow(
* @return
* @param <T> type of the output
*/
public <T> T GetSimpleWorkflowResultWithLongWait(
public <T> T GetSimpleWorkflowResultWithWait(
Class<T> valueClass,
final String workflowId,
final String workflowRunId) {
WorkflowGetResponse workflowGetResponse = defaultApi.apiV1WorkflowGetWithLongWaitPost(
WorkflowGetResponse workflowGetResponse = defaultApi.apiV1WorkflowGetWithWaitPost(
new WorkflowGetRequest()
.needsResults(true)
.workflowId(workflowId)
Expand All @@ -79,17 +80,15 @@ public <T> T GetSimpleWorkflowResultWithLongWait(
Preconditions.checkNotNull(workflowGetResponse.getResults(), checkErrorMessage);
Preconditions.checkArgument(workflowGetResponse.getResults().size() == 1, checkErrorMessage);
Preconditions.checkNotNull(workflowGetResponse.getResults().get(0).getCompletedStateOutput(), checkErrorMessage);

//TODO validate encoding type


final StateCompletionOutput output = workflowGetResponse.getResults().get(0);
return objectEncoder.decode(output.getCompletedStateOutput(), valueClass);
}

public <T> T GetSimpleWorkflowResultWithLongWait(
public <T> T GetSimpleWorkflowResultWithWait(
Class<T> valueClass,
final String workflowId) {
return GetSimpleWorkflowResultWithLongWait(valueClass, workflowId, "");
return GetSimpleWorkflowResultWithWait(valueClass, workflowId, "");
}

/**
Expand All @@ -98,9 +97,9 @@ public <T> T GetSimpleWorkflowResultWithLongWait(
* @param workflowRunId
* @return a list of the state output for completion states. User code will figure how to use ObjectEncoder to decode the output
*/
public List<StateCompletionOutput> GetComplexWorkflowResultWithLongWait(
public List<StateCompletionOutput> GetComplexWorkflowResultWithWait(
final String workflowId, final String workflowRunId) {
WorkflowGetResponse workflowGetResponse = defaultApi.apiV1WorkflowGetWithLongWaitPost(
WorkflowGetResponse workflowGetResponse = defaultApi.apiV1WorkflowGetWithWaitPost(
new WorkflowGetRequest()
.needsResults(true)
.workflowId(workflowId)
Expand All @@ -110,8 +109,8 @@ public List<StateCompletionOutput> GetComplexWorkflowResultWithLongWait(
return workflowGetResponse.getResults();
}

public List<StateCompletionOutput> GetComplexWorkflowResultWithLongWait(final String workflowId) {
return GetComplexWorkflowResultWithLongWait(workflowId, "");
public List<StateCompletionOutput> GetComplexWorkflowResultWithWait(final String workflowId) {
return GetComplexWorkflowResultWithWait(workflowId, "");
}
public void SignalWorkflow(
final Class<? extends Workflow> workflowClass,
Expand All @@ -120,17 +119,20 @@ public void SignalWorkflow(
final String signalChannelName,
final Object signalValue) {
final String wfType = workflowClass.getSimpleName();
if (registry.getSignalChannelNameToSignalTypeMap(wfType) == null) {
throw new RuntimeException(String.format("Workflow %s doesn't have any registered signal channels", wfType));
}

Map<String, Class<?>> nameToTypeMap = registry.getSignalChannelNameToSignalTypeMap(wfType);
if (nameToTypeMap == null) {
throw new IllegalArgumentException(
String.format("Workflow %s is not registered", wfType)
);
}

if (!nameToTypeMap.containsKey(signalChannelName)) {
throw new RuntimeException(String.format("Workflow %s doesn't have signal %s", wfType, signalChannelName));
throw new IllegalArgumentException(String.format("Workflow %s doesn't have signal %s", wfType, signalChannelName));
}
Class<?> signalType = nameToTypeMap.get(signalChannelName);
if (!signalType.isInstance(signalValue)) {
throw new RuntimeException(String.format("Signal value is not of type %s", signalType.getName()));
throw new IllegalArgumentException(String.format("Signal value is not of type %s", signalType.getName()));
}

defaultApi.apiV1WorkflowSignalPost(new WorkflowSignalRequest()
Expand Down Expand Up @@ -178,31 +180,38 @@ public String ResetWorkflow(
return resp.getWorkflowRunId();
}

public Map<String, Object> queryWorkflow(
public Map<String, Object> getWorkflowQueryAttributes(
final Class<? extends Workflow> workflowClass,
final String workflowId,
final String workflowRunId,
List<String> attributeKeys) {
if (attributeKeys == null || attributeKeys.isEmpty()) {
throw new IllegalArgumentException("attributeKeys must contain at least one entry, or use getAllQueryAttributes API to get all");
}
return doGetWorkflowQueryAttributes(workflowClass, workflowId, workflowRunId, attributeKeys);
}

private Map<String, Object> doGetWorkflowQueryAttributes(
final Class<? extends Workflow> workflowClass,
final String workflowId,
final String workflowRunId,
List<String> attributeKeys) {
final String wfType = workflowClass.getSimpleName();
if (registry.getWorkflow(wfType) == null) {
throw new RuntimeException(

Map<String, Class<?>> queryAttributeKeyToTypeMap = registry.getQueryAttributeKeyToTypeMap(wfType);
if (queryAttributeKeyToTypeMap == null) {
throw new IllegalArgumentException(
String.format("Workflow %s is not registered", wfType)
);
}
if (registry.getQueryAttributeKeyToTypeMap(wfType) == null) {
throw new RuntimeException(
String.format("Workflow %s doesn't have any registered query attribute", wfType)
);
}

Map<String, Class<?>> queryAttributeKeyToTypeMap = registry.getQueryAttributeKeyToTypeMap(wfType);
// if attribute keys is null or empty, iwf server will return all query attributes
if (attributeKeys != null && !attributeKeys.isEmpty()) {
List<String> nonExistingQueryAttributeList = attributeKeys.stream()
.filter(s -> !queryAttributeKeyToTypeMap.containsKey(s))
.toList();
.collect(Collectors.toList());
if (!nonExistingQueryAttributeList.isEmpty()) {
throw new RuntimeException(
throw new IllegalArgumentException(
String.format(
"Query attributes not registered: %s",
String.join(", ", nonExistingQueryAttributeList)
Expand All @@ -211,15 +220,15 @@ public Map<String, Object> queryWorkflow(
}
}

WorkflowQueryResponse response = defaultApi.apiV1WorkflowQueryPost(
new WorkflowQueryRequest()
WorkflowGetQueryAttributesResponse response = defaultApi.apiV1WorkflowQueryattributesGetPost(
new WorkflowGetQueryAttributesRequest()
.workflowId(workflowId)
.workflowRunId(workflowRunId)
.attributeKeys(attributeKeys)
);

if (response.getQueryAttributes() == null) {
throw new RuntimeException("query attributes not returned");
throw new InternalServiceException("query attributes not returned");
}
Map<String, Object> result = new HashMap<>();
for (KeyValue keyValue: response.getQueryAttributes()) {
Expand All @@ -233,10 +242,10 @@ public Map<String, Object> queryWorkflow(
return result;
}

public Map<String, Object> queryAllQueryAttributes(
public Map<String, Object> getAllQueryAttributes(
final Class<? extends Workflow> workflowClass,
final String workflowId,
final String workflowRunId) {
return queryWorkflow(workflowClass, workflowId, workflowRunId, null);
return doGetWorkflowQueryAttributes(workflowClass, workflowId, workflowRunId, null);
}
}
16 changes: 12 additions & 4 deletions src/main/java/io/github/cadenceoss/iwf/core/Registry.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,20 @@ private void registerWorkflow(final Workflow wf) {
String workflowType = wf.getClass().getSimpleName();

if (workflowStore.containsKey(workflowType)) {
throw new RuntimeException(String.format("Workflow type %s already exists", workflowType));
throw new WorkflowDefinitionException(String.format("Workflow type %s already exists", workflowType));
}
workflowStore.put(workflowType, wf);
}

private void registerWorkflowState(final Workflow wf) {
String workflowType = wf.getClass().getSimpleName();
if(wf.getStates() == null || wf.getStates().size() == 0){
throw new WorkflowDefinitionException(String.format("Workflow type %s must contain at least one state", workflowType));
}
for (StateDef stateDef: wf.getStates()) {
String key = getStateDefKey(workflowType, stateDef.getWorkflowState().getStateId());
if (workflowStateStore.containsKey(key)) {
throw new RuntimeException(String.format("Workflow state definition %s already exists", key));
throw new WorkflowDefinitionException(String.format("Workflow state definition %s already exists", key));
} else {
workflowStateStore.put(key, stateDef);
}
Expand All @@ -45,11 +48,16 @@ private void registerWorkflowState(final Workflow wf) {

private void registerWorkflowSignal(final Workflow wf) {
String workflowType = wf.getClass().getSimpleName();
if (wf.getSignalChannels() == null || wf.getSignalChannels().isEmpty()) {
signalTypeStore.put(workflowType, new HashMap<>());
return;
}

for (SignalChannelDef signalChannelDef: wf.getSignalChannels()) {
Map<String, Class<?>> signalNameToTypeMap =
signalTypeStore.computeIfAbsent(workflowType, s -> new HashMap<>());
if (signalNameToTypeMap.containsKey(signalChannelDef.getSignalChannelName())) {
throw new RuntimeException(
throw new WorkflowDefinitionException(
String.format("Signal channel name %s already exists", signalChannelDef.getSignalChannelName()));
}
signalNameToTypeMap.put(signalChannelDef.getSignalChannelName(), signalChannelDef.getSignalValueType());
Expand All @@ -67,7 +75,7 @@ private void registerWorkflowQueryAttributes(final Workflow wf) {
Map<String, Class<?>> queryAttributeKeyToTypeMap =
queryAttributeTypeStore.computeIfAbsent(workflowType, s -> new HashMap<>());
if (queryAttributeKeyToTypeMap.containsKey(queryAttributeDef.getQueryAttributeKey())) {
throw new RuntimeException(
throw new WorkflowDefinitionException(
String.format(
"Query attribute key %s already exists",
queryAttributeDef.getQueryAttributeKey())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ public QueryAttributesRWImpl(
@Override
public <T> T get(String key, Class<T> type) {
if (!queryAttributeNameToTypeMap.containsKey(key)) {
throw new RuntimeException(String.format("Query attribute %s is not registered", key));
throw new IllegalArgumentException(String.format("Query attribute %s is not registered", key));
}
if (!queryAttributeNameToEncodedObjectMap.containsKey(key)) {
return null;
}

Class<?> registeredType = queryAttributeNameToTypeMap.get(key);
if (!type.isAssignableFrom(registeredType)) {
throw new RuntimeException(
throw new IllegalArgumentException(
String.format(
"%s is not assignable from registered type %s",
type.getName(),
Expand All @@ -50,12 +50,12 @@ public <T> T get(String key, Class<T> type) {
@Override
public void set(String key, Object value) {
if (!queryAttributeNameToTypeMap.containsKey(key)) {
throw new RuntimeException(String.format("Query attribute %s is not registered", key));
throw new IllegalArgumentException(String.format("Query attribute %s is not registered", key));
}

Class<?> registeredType = queryAttributeNameToTypeMap.get(key);
if (!registeredType.isInstance(value)) {
throw new RuntimeException(String.format("Input is not an instance of class %s", registeredType.getName()));
throw new IllegalArgumentException(String.format("Input is not an instance of class %s", registeredType.getName()));
}

this.queryAttributeNameToEncodedObjectMap.put(key, objectEncoder.encode(value));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@
import io.github.cadenceoss.iwf.core.Workflow;
import io.github.cadenceoss.iwf.core.attributes.QueryAttributeDef;

import java.util.Arrays;
import java.util.List;

public class BasicQueryWorkflow implements Workflow {
public static final String ATTRIBUTE_KEY = "text";
@Override
public List<StateDef> getStates() {
return List.of(StateDef.startingState(new BasicQueryWorkflowState1()));
return Arrays.asList(StateDef.startingState(new BasicQueryWorkflowState1()));
}

@Override
public List<QueryAttributeDef> getQueryAttributes() {
return List.of(QueryAttributeDef.create(String.class, ATTRIBUTE_KEY));
return Arrays.asList(QueryAttributeDef.create(String.class, ATTRIBUTE_KEY));
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package io.github.cadenceoss.iwf.integ.basic;

import io.github.cadenceoss.iwf.core.*;
import io.github.cadenceoss.iwf.gen.models.EncodedObject;
import io.github.cadenceoss.iwf.spring.TestSingletonWorkerService;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.Arrays;
import java.util.Map;

public class BasicTest {
Expand All @@ -29,7 +28,7 @@ public void testBasicWorkflow() throws InterruptedException {
final Integer input = Integer.valueOf(0);
client.StartWorkflow(BasicWorkflow.class, BasicWorkflowS1.StateId, input, wfId, startOptions);
// wait for workflow to finish
final Integer output = client.GetSimpleWorkflowResultWithLongWait(Integer.class, wfId);
final Integer output = client.GetSimpleWorkflowResultWithWait(Integer.class, wfId);
Assertions.assertEquals(input + 2, output);
}

Expand All @@ -47,7 +46,7 @@ public void testBasicSignalWorkflow() throws InterruptedException {
BasicSignalWorkflow.class, BasicSignalWorkflowState1.STATE_ID, input, wfId, startOptions);
client.SignalWorkflow(
BasicSignalWorkflow.class, wfId, runId, BasicSignalWorkflowState1.SIGNAL_CHANNEL_NAME, Integer.valueOf(2));
final Integer output = client.GetSimpleWorkflowResultWithLongWait(Integer.class, wfId);
final Integer output = client.GetSimpleWorkflowResultWithWait(Integer.class, wfId);
Assertions.assertEquals(3, output);
}

Expand All @@ -61,13 +60,13 @@ public void testBasicQueryWorkflow() throws InterruptedException {
final WorkflowStartOptions startOptions = WorkflowStartOptions.minimum(10);
final String runId = client.StartWorkflow(
BasicQueryWorkflow.class, BasicQueryWorkflowState1.STATE_ID, "start", wfId, startOptions);
final String output = client.GetSimpleWorkflowResultWithLongWait(String.class, wfId);
final String output = client.GetSimpleWorkflowResultWithWait(String.class, wfId);
Map<String, Object> map =
client.queryWorkflow(BasicQueryWorkflow.class, wfId, runId, List.of(BasicQueryWorkflow.ATTRIBUTE_KEY));
client.getWorkflowQueryAttributes(BasicQueryWorkflow.class, wfId, runId, Arrays.asList(BasicQueryWorkflow.ATTRIBUTE_KEY));
Assertions.assertEquals(
"query-start-query-decide", map.get(BasicQueryWorkflow.ATTRIBUTE_KEY));
Map<String, Object> allQueryAttributes =
client.queryAllQueryAttributes(BasicQueryWorkflow.class, wfId, runId);
client.getAllQueryAttributes(BasicQueryWorkflow.class, wfId, runId);
Assertions.assertEquals(
"query-start-query-decide", allQueryAttributes.get(BasicQueryWorkflow.ATTRIBUTE_KEY));
Assertions.assertEquals(
Expand Down

0 comments on commit 8fa6a3c

Please sign in to comment.