Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add-query-workflow-support #37

Merged
merged 3 commits into from
Oct 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions src/main/java/io/github/cadenceoss/iwf/core/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.github.cadenceoss.iwf.gen.api.ApiClient;
import io.github.cadenceoss.iwf.gen.api.DefaultApi;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -177,4 +178,65 @@ public String ResetWorkflow(
return resp.getWorkflowRunId();
}

public Map<String, Object> queryWorkflow(
final Class<? extends Workflow> workflowClass,
final String workflowId,
final String workflowRunId,
List<String> attributeKeys) {
final String wfType = workflowClass.getSimpleName();
if (registry.getWorkflow(wfType) == null) {
throw new RuntimeException(
String.format("Workflow %s is not registered", wfType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be great to throw different exception exceptions based on input error, workflow error or internal error: #42

But we can also improve in a separate PR.

);
}
if (registry.getQueryAttributeKeyToTypeMap(wfType) == null) {
throw new RuntimeException(
String.format("Workflow %s doesn't have any registered query attribute", wfType)
);
}

Map<String, Class<?>> queryAttributeKeyToTypeMap = registry.getQueryAttributeKeyToTypeMap(wfType);
// if attribute keys is null or empty, iwf server will return all query attributes
if (attributeKeys != null && !attributeKeys.isEmpty()) {
List<String> nonExistingQueryAttributeList = attributeKeys.stream()
.filter(s -> !queryAttributeKeyToTypeMap.containsKey(s))
.toList();
if (!nonExistingQueryAttributeList.isEmpty()) {
throw new RuntimeException(
String.format(
"Query attributes not registered: %s",
String.join(", ", nonExistingQueryAttributeList)
)
);
}
}

WorkflowQueryResponse response = defaultApi.apiV1WorkflowQueryPost(
new WorkflowQueryRequest()
.workflowId(workflowId)
.workflowRunId(workflowRunId)
.attributeKeys(attributeKeys)
);

if (response.getQueryAttributes() == null) {
throw new RuntimeException("query attributes not returned");
}
Map<String, Object> result = new HashMap<>();
for (KeyValue keyValue: response.getQueryAttributes()) {
if (keyValue.getValue() != null) {
result.put(
keyValue.getKey(),
objectEncoder.decode(keyValue.getValue(), queryAttributeKeyToTypeMap.get(keyValue.getKey()))
);
}
}
return result;
}

public Map<String, Object> queryAllQueryAttributes(
final Class<? extends Workflow> workflowClass,
final String workflowId,
final String workflowRunId) {
return queryWorkflow(workflowClass, workflowId, workflowRunId, null);
}
}
31 changes: 31 additions & 0 deletions src/main/java/io/github/cadenceoss/iwf/core/Registry.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.github.cadenceoss.iwf.core;

import io.github.cadenceoss.iwf.core.command.SignalChannelDef;
import io.github.cadenceoss.iwf.core.attributes.QueryAttributeDef;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -10,13 +11,15 @@ public class Registry {
// (workflow type, stateId)-> StateDef
private final Map<String, StateDef> workflowStateStore = new HashMap<>();
private final Map<String, Map<String, Class<?>>> signalTypeStore = new HashMap<>();
private final Map<String, Map<String, Class<?>>> queryAttributeTypeStore = new HashMap<>();

private static final String DELIMITER = "_";

public void addWorkflow(final Workflow wf){
registerWorkflow(wf);
registerWorkflowState(wf);
registerWorkflowSignal(wf);
registerWorkflowQueryAttributes(wf);
}

private void registerWorkflow(final Workflow wf) {
Expand Down Expand Up @@ -53,6 +56,30 @@ private void registerWorkflowSignal(final Workflow wf) {
}
}

private void registerWorkflowQueryAttributes(final Workflow wf) {
String workflowType = wf.getClass().getSimpleName();
if (wf.getQueryAttributes() == null || wf.getQueryAttributes().isEmpty()) {
duoertai marked this conversation as resolved.
Show resolved Hide resolved
queryAttributeTypeStore.put(workflowType, new HashMap<>());
return;
}

for (QueryAttributeDef queryAttributeDef: wf.getQueryAttributes()) {
Map<String, Class<?>> queryAttributeKeyToTypeMap =
queryAttributeTypeStore.computeIfAbsent(workflowType, s -> new HashMap<>());
if (queryAttributeKeyToTypeMap.containsKey(queryAttributeDef.getQueryAttributeKey())) {
throw new RuntimeException(
String.format(
"Query attribute key %s already exists",
queryAttributeDef.getQueryAttributeKey())
);
}
queryAttributeKeyToTypeMap.put(
queryAttributeDef.getQueryAttributeKey(),
queryAttributeDef.getQueryAttributeType()
);
}
}

public Workflow getWorkflow(final String workflowType){
return workflowStore.get(workflowType);
}
Expand All @@ -65,6 +92,10 @@ public Map<String, Class<?>> getSignalChannelNameToSignalTypeMap(final String wo
return signalTypeStore.get(workflowType);
}

public Map<String, Class<?>> getQueryAttributeKeyToTypeMap(final String workflowType) {
return queryAttributeTypeStore.get(workflowType);
}

private String getStateDefKey(final String workflowType, final String stateId) {
return workflowType + DELIMITER + stateId;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.github.cadenceoss.iwf.core;

import io.github.cadenceoss.iwf.gen.models.KeyValue;
import org.immutables.value.Value;

import java.util.Arrays;
Expand All @@ -13,6 +14,8 @@ public abstract class StateDecision {

public abstract Optional<Boolean> getWaitForMoreCommandResults();

public abstract Optional<List<KeyValue>> getUpsertQueryAttributes();

public static final StateDecision DEAD_END = ImmutableStateDecision.builder().build();
public static final StateDecision COMPLETING_WORKFLOW = ImmutableStateDecision.builder().nextStates(Arrays.asList(StateMovement.GRACEFUL_COMPLETING_WORKFLOW)).build();
public static final StateDecision FAILING_WORKFLOW = ImmutableStateDecision.builder().nextStates(Arrays.asList(StateMovement.FORCE_FAILING_WORKFLOW_MOVEMENT)).build();
Expand Down
52 changes: 41 additions & 11 deletions src/main/java/io/github/cadenceoss/iwf/core/WorkerService.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package io.github.cadenceoss.iwf.core;

import io.github.cadenceoss.iwf.core.attributes.QueryAttributesRW;
import io.github.cadenceoss.iwf.core.attributes.QueryAttributesRWImpl;
import io.github.cadenceoss.iwf.core.mapper.CommandRequestMapper;
import io.github.cadenceoss.iwf.core.mapper.CommandResultsMapper;
import io.github.cadenceoss.iwf.gen.models.WorkflowStateDecideResponse;
import io.github.cadenceoss.iwf.gen.models.*;
import io.github.cadenceoss.iwf.core.command.CommandRequest;
import io.github.cadenceoss.iwf.core.mapper.StateDecisionMapper;
import io.github.cadenceoss.iwf.gen.models.EncodedObject;
import io.github.cadenceoss.iwf.gen.models.WorkflowStateDecideRequest;
import io.github.cadenceoss.iwf.gen.models.WorkflowStateStartRequest;
import io.github.cadenceoss.iwf.gen.models.WorkflowStateStartResponse;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class WorkerService {
private final Registry registry;
Expand All @@ -22,18 +25,29 @@ public WorkerService(Registry registry) {
public WorkflowStateStartResponse handleWorkflowStateStart(final WorkflowStateStartRequest req) {
StateDef state = registry.getWorkflowState(req.getWorkflowType(), req.getWorkflowStateId());
final EncodedObject stateInput = req.getStateInput();
final Object input;
input = objectEncoder.decode(stateInput, state.getWorkflowState().getInputType());
CommandRequest commandRequest = state.getWorkflowState().start(null, input, null, null, null);
final Object input = objectEncoder.decode(stateInput, state.getWorkflowState().getInputType());
final QueryAttributesRWImpl queryAttributesRW =
createQueryAttributesRW(req.getWorkflowType(), req.getQueryAttributes());

CommandRequest commandRequest = state.getWorkflowState().start(
null,
input,
null,
null,
queryAttributesRW);

return new WorkflowStateStartResponse().commandRequest(CommandRequestMapper.toGenerated(commandRequest));
return new WorkflowStateStartResponse()
.commandRequest(CommandRequestMapper.toGenerated(commandRequest))
.upsertQueryAttributes(queryAttributesRW.getUpsertQueryAttributes());
}

public WorkflowStateDecideResponse handleWorkflowStateDecide(final WorkflowStateDecideRequest req) {
StateDef state = registry.getWorkflowState(req.getWorkflowType(), req.getWorkflowStateId());
final Object input;
final EncodedObject stateInput = req.getStateInput();
input = objectEncoder.decode(stateInput, state.getWorkflowState().getInputType());
final QueryAttributesRWImpl queryAttributesRW =
createQueryAttributesRW(req.getWorkflowType(), req.getQueryAttributes());

StateDecision stateDecision = state.getWorkflowState().decide(
null,
Expand All @@ -44,7 +58,23 @@ public WorkflowStateDecideResponse handleWorkflowStateDecide(final WorkflowState
objectEncoder),
null,
null,
null);
return new WorkflowStateDecideResponse().stateDecision(StateDecisionMapper.toGenerated(stateDecision));
queryAttributesRW);
List<KeyValue> queryAttributesToUpsert = queryAttributesRW.getUpsertQueryAttributes();
stateDecision = ImmutableStateDecision.copyOf(stateDecision).withUpsertQueryAttributes(queryAttributesToUpsert);
return new WorkflowStateDecideResponse()
.stateDecision(StateDecisionMapper.toGenerated(stateDecision));
}
duoertai marked this conversation as resolved.
Show resolved Hide resolved

private QueryAttributesRWImpl createQueryAttributesRW(String workflowType, List<KeyValue> keyValues) {
Map<String, EncodedObject> map;
if (keyValues == null || keyValues.isEmpty()) {
map = new HashMap<>();
} else {
map = keyValues.stream()
.filter(keyValue -> keyValue.getValue() != null)
.collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue));
}

return new QueryAttributesRWImpl(registry.getQueryAttributeKeyToTypeMap(workflowType), map, objectEncoder);
}
}
2 changes: 1 addition & 1 deletion src/main/java/io/github/cadenceoss/iwf/core/Workflow.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ default List<SearchAttributeDef> getSearchAttributes() {
/**
* defines all the query attributes supported by this workflow.
*/
default List<QueryAttributeDef<?>> getQueryAttributes() {
default List<QueryAttributeDef> getQueryAttributes() {
return Collections.emptyList();
}

Expand Down
18 changes: 10 additions & 8 deletions src/main/java/io/github/cadenceoss/iwf/core/WorkflowState.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
package io.github.cadenceoss.iwf.core;

import io.github.cadenceoss.iwf.core.attributes.AttributeLoadingPolicy;
import io.github.cadenceoss.iwf.core.attributes.QueryAttributesRW;
import io.github.cadenceoss.iwf.core.attributes.SearchAttributesRW;
import io.github.cadenceoss.iwf.core.attributes.StateLocalAttributesR;
import io.github.cadenceoss.iwf.core.attributes.StateLocalAttributesW;
import io.github.cadenceoss.iwf.core.attributes.*;
import io.github.cadenceoss.iwf.core.command.CommandCarryOverPolicy;
import io.github.cadenceoss.iwf.core.command.CommandRequest;
import io.github.cadenceoss.iwf.core.command.CommandResults;
Expand Down Expand Up @@ -42,7 +38,9 @@ default StateOptions getStateOptions() {
*/
CommandRequest start(
final Context context, I input,
final StateLocalAttributesW stateLocals, final SearchAttributesRW searchAttributes, final QueryAttributesRW queryAttributes);
final StateLocalAttributesW stateLocals,
final SearchAttributesRW searchAttributes,
final QueryAttributesRW queryAttributes);

/**
* Implement this method to decide what to do next when requested commands are ready
Expand All @@ -54,8 +52,12 @@ CommandRequest start(
* @return the decision of what to do next(e.g. transition to next states)
*/
StateDecision decide(
final Context context, final I input, final CommandResults commandResults,
final StateLocalAttributesR stateLocals, final SearchAttributesRW searchAttributes, final QueryAttributesRW queryAttributes);
final Context context,
final I input,
final CommandResults commandResults,
final StateLocalAttributesR stateLocals,
final SearchAttributesRW searchAttributes,
final QueryAttributesRW queryAttributes);
}


Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import org.immutables.value.Value;

@Value.Immutable
public abstract class QueryAttributeDef<T> {
public abstract Class<T> getQueryAttributeType();
public abstract class QueryAttributeDef {
public abstract Class getQueryAttributeType();

public abstract String getQueryAttributeKey();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
package io.github.cadenceoss.iwf.core.attributes;

public class QueryAttributesRW {

public <T> T get(String key) {
return null;
}

public void upsert(String key, Object value) {
return;
}
public interface QueryAttributesRW {
<T> T get(String key, Class<T> type);
void set(String key, Object value);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package io.github.cadenceoss.iwf.core.attributes;

import io.github.cadenceoss.iwf.core.ObjectEncoder;
import io.github.cadenceoss.iwf.gen.models.EncodedObject;
import io.github.cadenceoss.iwf.gen.models.KeyValue;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class QueryAttributesRWImpl implements QueryAttributesRW{
private final Map<String, Class<?>> queryAttributeNameToTypeMap;
private final Map<String, EncodedObject> queryAttributeNameToEncodedObjectMap;
duoertai marked this conversation as resolved.
Show resolved Hide resolved
private final Map<String, EncodedObject> queryAttributesToUpsert;
private final ObjectEncoder objectEncoder;

public QueryAttributesRWImpl(
final Map<String, Class<?>> queryAttributeNameToTypeMap,
final Map<String, EncodedObject> queryAttributeNameToValueMap,
final ObjectEncoder objectEncoder) {
this.queryAttributeNameToTypeMap = queryAttributeNameToTypeMap;
this.queryAttributeNameToEncodedObjectMap = queryAttributeNameToValueMap;
this.queryAttributesToUpsert = new HashMap<>();
this.objectEncoder = objectEncoder;
}

@Override
public <T> T get(String key, Class<T> type) {
if (!queryAttributeNameToTypeMap.containsKey(key)) {
throw new RuntimeException(String.format("Query attribute %s is not registered", key));
}
if (!queryAttributeNameToEncodedObjectMap.containsKey(key)) {
return null;
}

Class<?> registeredType = queryAttributeNameToTypeMap.get(key);
if (!type.isAssignableFrom(registeredType)) {
duoertai marked this conversation as resolved.
Show resolved Hide resolved
throw new RuntimeException(
String.format(
"%s is not assignable from registered type %s",
type.getName(),
registeredType.getName()));
}

return type.cast(
objectEncoder.decode(queryAttributeNameToEncodedObjectMap.get(key), registeredType));
}

@Override
public void set(String key, Object value) {
if (!queryAttributeNameToTypeMap.containsKey(key)) {
throw new RuntimeException(String.format("Query attribute %s is not registered", key));
}

Class<?> registeredType = queryAttributeNameToTypeMap.get(key);
if (!registeredType.isInstance(value)) {
throw new RuntimeException(String.format("Input is not an instance of class %s", registeredType.getName()));
}

this.queryAttributeNameToEncodedObjectMap.put(key, objectEncoder.encode(value));
this.queryAttributesToUpsert.put(key, objectEncoder.encode(value));
}

public List<KeyValue> getUpsertQueryAttributes() {
return queryAttributesToUpsert.entrySet().stream()
.map(stringEncodedObjectEntry ->
new KeyValue()
.key(stringEncodedObjectEntry.getKey())
.value(stringEncodedObjectEntry.getValue()))
.collect(Collectors.toList());
}
}
Loading