Skip to content

Commit

Permalink
#197 Refactor TaskEngine to prepare for propagating behaviour error d…
Browse files Browse the repository at this point in the history
…etails to consumers
  • Loading branch information
Marcin Szymura committed Oct 1, 2020
1 parent 3449fb6 commit e05f4ad
Show file tree
Hide file tree
Showing 32 changed files with 930 additions and 744 deletions.
15 changes: 14 additions & 1 deletion api/src/main/java/io/knotx/fragments/api/Fragment.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ public Fragment(String type, JsonObject configuration, String body) {
this.payload = new JsonObject();
}

private Fragment(String id, String type, JsonObject configuration, String body,
JsonObject payload) {
this.id = id;
this.type = type;
this.configuration = configuration;
this.body = body;
this.payload = payload;
}

public Fragment(JsonObject json) {
this.id = json.getString(ID_KEY);
this.type = json.getString(TYPE_KEY);
Expand All @@ -66,6 +75,10 @@ public JsonObject toJson() {
.put(PAYLOAD_KEY, payload);
}

public Fragment copy() {
return new Fragment(id, type, configuration.copy(), body, payload.copy());
}

/**
* Unique identifier of the Fragment. Its representaion is currently a {@code String}
* representation of {@code UUID}. It can never change during processing.
Expand Down Expand Up @@ -125,7 +138,7 @@ public JsonObject getPayload() {
* Appends new entry int the Fragment's payload. Notice, that it may overwrite any existing info
* in the payload, if the keys are identical.
*
* @param key - a key under which payload info will be saved.
* @param key - a key under which payload info will be saved.
* @param value - a value of the payload info.
* @return a reference to this, so the API can be used fluently.
*/
Expand Down
4 changes: 4 additions & 0 deletions api/src/main/java/io/knotx/fragments/api/FragmentContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public FragmentContext(JsonObject json) {
this.clientRequest = new ClientRequest(json.getJsonObject(CLIENT_REQUEST_KEY));
}

public FragmentContext copy() {
return new FragmentContext(fragment.copy(), clientRequest.copy());
}

public JsonObject toJson() {
return new JsonObject()
.put(FRAGMENT_KEY, fragment.toJson())
Expand Down
4 changes: 2 additions & 2 deletions task/api/src/main/java/io/knotx/fragments/task/api/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ public class Task {
private final String name;
private final Node rootNode;

public Task(String name) {
this(name, null);
public static Task undefined() {
return new Task(UNDEFINED_TASK, null);
}

public Task(String name, Node rootNode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,78 @@
*/
package io.knotx.fragments.task.engine;

import io.knotx.fragments.api.FragmentResult;
import io.knotx.fragments.task.engine.EventLogEntry.NodeStatus;
import io.vertx.core.json.JsonObject;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

public class EventLog {

private final List<EventLogEntry> operations;
private final String taskName;

public EventLog() {
public EventLog(String taskName) {
this.taskName = taskName;
operations = new ArrayList<>();
}

public EventLog(List<EventLogEntry> operations) {
this.operations = operations;
public EventLog nodeStarted(String node) {
operations.add(new EventLogEntry(taskName, node, NodeStatus.UNPROCESSED, null, null, null));
return this;
}

void append(EventLogEntry logEntry) {
public EventLog compositeSuccess(String node, String transition) {
operations.add(new EventLogEntry(taskName, node, NodeStatus.SUCCESS, transition, null, null));
return this;
}

public EventLog compositeUnprocessed(String node, String transition) {
// TODO: Change to NodeStatus.UNPROCESSED when validated contract
// operations.add(new EventLogEntry(taskName, node, NodeStatus.UNPROCESSED, transition, null, null));
return compositeError(node, transition);
}

public EventLog compositeError(String node, String transition) {
operations.add(new EventLogEntry(taskName, node, NodeStatus.ERROR, transition, null, null));
return this;
}

public EventLog success(String node, FragmentResult fragmentResult) {
operations.add(new EventLogEntry(taskName, node, NodeStatus.SUCCESS, fragmentResult.getTransition(),
fragmentResult.getLog(), null));
return this;
}

public EventLog unsupported(String node, String transition) {
operations.add(new EventLogEntry(taskName, node, NodeStatus.UNSUPPORTED_TRANSITION, transition, null, null));
return this;
}

public EventLog error(String node, String transition) {
return error(node, transition, null);
}

public EventLog error(String node, FragmentResult fragmentResult) {
return error(node, fragmentResult.getTransition(), fragmentResult.getLog());
}

public EventLog error(String node, String transition, JsonObject nodeLog) {
operations.add(new EventLogEntry(taskName, node, NodeStatus.ERROR, transition, nodeLog, null));
return this;
}

public EventLog exception(String node, String transition, Throwable error) {
operations.add(new EventLogEntry(taskName, node, NodeStatus.ERROR, transition, null, error));
return this;
}

public void append(EventLogEntry logEntry) {
operations.add(logEntry);
}

void appendAll(EventLog log) {
public void appendAll(EventLog log) {
this.operations.addAll(log.operations);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,37 +36,7 @@ public enum NodeStatus {
private final JsonObject nodeLog;
private final Throwable error;

public static EventLogEntry started(String task, String node) {
return new EventLogEntry(task, node, NodeStatus.UNPROCESSED, null, null, null);
}

public static EventLogEntry success(String task, String node, FragmentResult fragmentResult) {
return new EventLogEntry(task, node, NodeStatus.SUCCESS, fragmentResult.getTransition(),
fragmentResult.getLog(), null);
}

public static EventLogEntry unsupported(String task, String node, String transition) {
return new EventLogEntry(task, node, NodeStatus.UNSUPPORTED_TRANSITION, transition, null, null);
}

public static EventLogEntry error(String task, String node, FragmentResult fragmentResult) {
return error(task, node, fragmentResult.getTransition(), fragmentResult.getLog());
}

public static EventLogEntry error(String task, String node, String transition) {
return error(task, node, transition, null);
}

public static EventLogEntry error(String task, String node, String transition, JsonObject nodeLog) {
return new EventLogEntry(task, node, NodeStatus.ERROR, transition, nodeLog, null);
}

public static EventLogEntry exception(String task, String node, String transition,
Throwable error) {
return new EventLogEntry(task, node, NodeStatus.ERROR, transition, null, error);
}

private EventLogEntry(String task, String node, NodeStatus status, String transition,
EventLogEntry(String task, String node, NodeStatus status, String transition,
JsonObject nodeLog, Throwable error) {
this.task = task;
this.node = node;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,27 @@
*/
package io.knotx.fragments.task.engine;

import io.knotx.fragments.api.Fragment;
import io.knotx.fragments.api.FragmentContext;
import io.knotx.fragments.task.api.Task;
import io.knotx.server.api.context.ClientRequest;

public class FragmentEventContextTaskAware {
public class FragmentContextTaskAware {

private final Task task;
private final FragmentEventContext fragmentEventContext;
private final FragmentContext fragmentContext;

public FragmentEventContextTaskAware(Task task,
FragmentEventContext fragmentEventContext) {
public FragmentContextTaskAware(Task task,
ClientRequest clientRequest, Fragment fragment) {
this.task = task;
this.fragmentEventContext = fragmentEventContext;
this.fragmentContext = new FragmentContext(fragment, clientRequest);
}

public Task getTask() {
return task;
}

public FragmentEventContext getFragmentEventContext() {
return fragmentEventContext;
public FragmentContext getFragmentContext() {
return fragmentContext;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
*/
package io.knotx.fragments.task.engine;

import io.knotx.fragments.task.engine.FragmentEvent.Status;
import io.knotx.fragments.task.engine.TaskResult.Status;
import io.knotx.fragments.task.api.Node;
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.reactivex.core.Vertx;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -50,52 +49,47 @@ public FragmentsEngine(Vertx vertx) {
* @return asynchronous response containing processed list of fragment events returned in the same
* order as the original list
*/
public Single<List<FragmentEvent>> execute(List<FragmentEventContextTaskAware> fragments) {

public Single<List<TaskResult>> execute(List<FragmentContextTaskAware> fragments) {
return Flowable.just(fragments)
.concatMap(Flowable::fromIterable)
.map(fragmentCtx -> fragmentCtx
.flatMapSingle(ctx -> ctx
.getTask()
.getRootNode()
.map(rootNode -> startTaskEngine(fragmentCtx, rootNode))
.orElseGet(() -> Single.just(fragmentCtx.getFragmentEventContext().getFragmentEvent()))
.map(rootNode -> startTaskEngine(ctx, rootNode))
.orElseGet(() -> Single.just(new TaskResult(ctx.getTask().getName(), ctx.getFragmentContext().getFragment())))
)
.flatMap(Single::toFlowable)
.reduce(new ArrayList<FragmentEvent>(), (list, item) -> {
list.add(item);
return list;
})
.toList()
.map(list -> incomingOrder(list, fragments))
.map(this::traceEngineResults);
}

private Single<FragmentEvent> startTaskEngine(FragmentEventContextTaskAware fragment, Node rootNode) {
return taskEngine.start(fragment.getTask().getName(), rootNode, fragment.getFragmentEventContext());
private Single<TaskResult> startTaskEngine(FragmentContextTaskAware context, Node rootNode) {
return taskEngine.start(context.getTask().getName(), rootNode, context.getFragmentContext());
}

private List<FragmentEvent> incomingOrder(
List<FragmentEvent> list, List<FragmentEventContextTaskAware> sourceEvents) {
private List<TaskResult> incomingOrder(
List<TaskResult> list, List<FragmentContextTaskAware> sourceEvents) {

return sourceEvents.stream()
.map(event -> event.getFragmentEventContext().getFragmentEvent().getFragment().getId())
.map(context -> context.getFragmentContext().getFragment().getId())
.map(id -> getFragmentFromListById(id, list))
.collect(Collectors.toList());
}

private FragmentEvent getFragmentFromListById(String id, List<FragmentEvent> events) {
private TaskResult getFragmentFromListById(String id, List<TaskResult> events) {
return events
.stream()
.filter(event -> id.equals(event.getFragment().getId()))
.findFirst()
.orElseThrow(() -> new IllegalStateException("Could not find fragment with id: " + id));
}

private List<FragmentEvent> traceEngineResults(List<FragmentEvent> results) {
private List<TaskResult> traceEngineResults(List<TaskResult> results) {
if (LOGGER.isTraceEnabled()) {
List<FragmentEvent> processedEvents = results.stream()
List<TaskResult> processedEvents = results.stream()
.filter(event -> Status.UNPROCESSED != event.getStatus())
.collect(Collectors.toList());
LOGGER.trace("Knot Engine processed fragments: [{}]", processedEvents);
LOGGER.trace("Task Engine processed fragments: [{}]", processedEvents);
}
return results;
}
Expand Down
Loading

0 comments on commit e05f4ad

Please sign in to comment.