Skip to content

Commit

Permalink
#168 Extends event log with node error details
Browse files Browse the repository at this point in the history
Move NodeFatalException to Task API.

Remove timeout status from event log.

CR fix.

Update Changelog and README.

Added log verifier unit tests, tests fixes.

Tests fixes.
  • Loading branch information
tomaszmichalak committed Jul 17, 2020
1 parent cd1d87b commit ed59aed
Show file tree
Hide file tree
Showing 20 changed files with 621 additions and 407 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ All notable changes to `knotx-fragments` will be documented in this file.

## Unreleased
List of changes that are finished but not yet released in any final version.
- [PR-172](https://github.com/Knotx/knotx-stack/pull/172) - Add a task node processing exception to event log. Remove unused 'TIMEOUT' node status. Update node unit tests.
- [PR-170](https://github.com/Knotx/knotx-stack/pull/170) - Upgrade to Vert.x `3.9.1`, replace deprecated `setHandler` with `onComplete`.

## 2.2.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,24 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* The code comes from https://github.com/tomaszmichalak/vertx-rx-map-reduce.
*/
package io.knotx.fragments.task.engine.exception;

import io.knotx.fragments.api.Fragment;
package io.knotx.fragments.task.api;

/**
* This exception informs that task processing can not be continued.
*/
public class NodeFatalException extends IllegalStateException {

private Fragment fragment;
public NodeFatalException(String message) {
super(message);
}

public NodeFatalException(Fragment fragment) {
super("Failed during fragment processing [" + fragment.getId() + "]");
this.fragment = fragment;
public NodeFatalException(Throwable cause) {
super(cause);
}

public Fragment getFragment() {
return fragment;
public NodeFatalException(String message, Throwable cause) {
super(message, cause);
}

}
34 changes: 22 additions & 12 deletions task/engine/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,14 @@ then a fragment status is `failure`.<br/>

#### Fragment log
A fragment's log contains details about task processing. When node processing ends (or
raises an exception), the engine appends the new [entry](https://github.com/Knotx/knotx-fragments/blob/master/engine/src/main/java/io/knotx/fragments/engine/EventLogEntry.java)
in the fragment's log containing:
- task name
- node identifier
- node status
- [node log](https://github.com/Knotx/knotx-fragments/tree/master/task/api##node-log)
- transition
- timestamp.
raises an exception), the engine appends the new entry in the fragment's log containing:
- task name (`String`)
- node identifier (`String`)
- node status (`io.knotx.fragments.task.engine.EventLogEntry.NodeStatus`)
- transition (`String`)
- node log (`io.vertx.core.json.JsonObject`)
- error (`Throwable`)
- timestamp (`Long`).

Node status is a simple text value managed by the engine. It resembles a fragment's status but is a
bit more accurate (such as a `UNSUPPORTED_TRANSITION` value).
Expand All @@ -115,7 +115,17 @@ task is a graph of two nodes: `A` and `B`.
The `A` node responds with the `_success` transition. Then the `B` node starts processing and responds
with the `_succcess` transition. Finally, the fragment status is `SUCCESS` and the fragment's log contains:

| Task | Node identifier | Node status | Transition | Node Log |
|------------|-----------------|-------------|------------|-----------------|
| `taskName` | `A` | SUCCESS | `_success` | { A node log } |
| `taskName` | `B` | SUCCESS | `_success` | { B node log } |
| Timestamp | Task | Node identifier | Node status | Transition | Node Log | Error |
|-----------|------------|-----------------|-------------|------------|-----------------| ------- |
| 1111111111| `taskName` | `A` | UNPROCESSED | | | |
| 1111111112| `taskName` | `A` | SUCCESS | `_success` | { A node log } | |
| 1111111113| `taskName` | `B` | UNPROCESSED | | | |
| 1111111114| `taskName` | `B` | SUCCESS | `_success` | { B node log } | |

Please note that every time, a leaf node (in a task or subtask) responds with a transition different
than `_success` then the engine adds the unsupported transition entry. See the example below:

| Timestamp | Task | Node identifier | Node status | Transition | Node Log | Error |
|-----------|------------|-----------------|------------------------|------------|-----------------| --------- |
| 1111111111| `taskName` | `B` | ERROR | `_error` | { } | Throwable |
| 1111111112| `taskName` | `B` | UNSUPPORTED_TRANSITION | `_error` | { } | Throwable |
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,20 @@
*/
package io.knotx.fragments.task.engine;

import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

public class EventLog {

private static final String OPERATIONS_KEY = "operations";

private final List<EventLogEntry> operations;

public EventLog() {
operations = new ArrayList<>();
}

public EventLog(JsonObject json) {
operations = json.getJsonArray(OPERATIONS_KEY).stream()
.map(JsonObject.class::cast)
.map(EventLogEntry::new)
.collect(Collectors.toList());
}

public JsonObject toJson() {
final JsonArray jsonArray = new JsonArray();
operations.forEach(entry -> jsonArray.add(entry.toJson()));
return new JsonObject()
.put(OPERATIONS_KEY, jsonArray);
public EventLog(List<EventLogEntry> operations) {
this.operations = operations;
}

void append(EventLogEntry logEntry) {
Expand Down Expand Up @@ -72,10 +57,6 @@ public long getLatestTimestamp() {
.orElse(0);
}

public JsonObject getLog() {
return toJson();
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,92 +12,69 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* The code comes from https://github.com/tomaszmichalak/vertx-rx-map-reduce.
*/
package io.knotx.fragments.task.engine;

import io.knotx.fragments.api.FragmentResult;
import io.vertx.core.json.JsonObject;
import java.util.Objects;

public class EventLogEntry {

private static final String TASK_KEY = "task";
private static final String NODE_KEY = "node";
private static final String STATUS_KEY = "status";
private static final String TRANSITION_KEY = "transition";
private static final String TIMESTAMP_KEY = "timestamp";
private static final String NODE_LOG_KEY = "nodeLog";
public enum NodeStatus {
SUCCESS,
UNSUPPORTED_TRANSITION,
ERROR,
UNPROCESSED
}

private final String task;
private final String node;
private final NodeStatus status;
private final String transition;
private final long timestamp;
private final JsonObject nodeLog;
private final Throwable error;

public static EventLogEntry started(String task, String node) {
return new EventLogEntry(task, node, NodeStatus.UNPROCESSED, null, null);
return new EventLogEntry(task, node, NodeStatus.UNPROCESSED, null, null, null);
}

public static EventLogEntry success(String task, String node, FragmentResult fragmentResult) {
return new EventLogEntry(task, node, NodeStatus.SUCCESS, fragmentResult.getTransition(), fragmentResult.getLog());
return new EventLogEntry(task, node, NodeStatus.SUCCESS, fragmentResult.getTransition(),
fragmentResult.getLog(), null);
}

public static EventLogEntry unsupported(String task, String node, String transition) {
return new EventLogEntry(task, node, NodeStatus.UNSUPPORTED_TRANSITION, transition,null);
return new EventLogEntry(task, node, NodeStatus.UNSUPPORTED_TRANSITION, transition, null, null);
}

public static EventLogEntry error(String task, String node, String transition, JsonObject actionLog) {
return new EventLogEntry(task, node, NodeStatus.ERROR, transition, actionLog);
public static EventLogEntry error(String task, String node, FragmentResult fragmentResult) {
return error(task, node, fragmentResult.getTransition(), fragmentResult.getLog());
}

public static EventLogEntry error(String task, String node, String transition) {
return new EventLogEntry(task, node, NodeStatus.ERROR, transition,null);
return error(task, node, transition, null);
}

public static EventLogEntry error(String task, String node, String transition, JsonObject nodeLog) {
return new EventLogEntry(task, node, NodeStatus.ERROR, transition, nodeLog, null);
}

public static EventLogEntry timeout(String task, String node) {
return new EventLogEntry(task, node, NodeStatus.TIMEOUT, null, null);
public static EventLogEntry exception(String task, String node, String transition,
Throwable error) {
return new EventLogEntry(task, node, NodeStatus.ERROR, transition, null, error);
}

private EventLogEntry(String task, String node, NodeStatus status, String transition, JsonObject nodeLog) {
private EventLogEntry(String task, String node, NodeStatus status, String transition,
JsonObject nodeLog, Throwable error) {
this.task = task;
this.node = node;
this.status = status;
this.transition = transition;
this.timestamp = System.currentTimeMillis();
this.nodeLog = nodeLog;
}

EventLogEntry(JsonObject json) {
this.task = json.getString(TASK_KEY);
this.node = json.getString(NODE_KEY);
this.status = NodeStatus.valueOf(json.getString(STATUS_KEY));
this.transition = json.getString(TRANSITION_KEY);
this.timestamp = json.getLong(TIMESTAMP_KEY);
this.nodeLog = json.getJsonObject(NODE_LOG_KEY);
}

public JsonObject toJson() {
return new JsonObject()
.put(TASK_KEY, task)
.put(NODE_KEY, node)
.put(STATUS_KEY, status.name())
.put(TRANSITION_KEY, transition)
.put(TIMESTAMP_KEY, timestamp)
.put(NODE_LOG_KEY, nodeLog);
}

@Override
public String toString() {
return "EventLogEntry{" +
"task='" + task + '\'' +
", node='" + node + '\'' +
", status=" + status +
", transition='" + transition + '\'' +
", timestamp=" + timestamp +
", nodeLog=" + nodeLog +
'}';
this.nodeLog = nodeLog == null ? new JsonObject() : nodeLog;
this.error = error;
}

public String getTask() {
Expand All @@ -124,12 +101,43 @@ public JsonObject getNodeLog() {
return nodeLog;
}

public enum NodeStatus {
SUCCESS,
UNSUPPORTED_TRANSITION,
ERROR,
TIMEOUT,
UNPROCESSED
public Throwable getError() {
return error;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
EventLogEntry that = (EventLogEntry) o;
return timestamp == that.timestamp &&
Objects.equals(task, that.task) &&
Objects.equals(node, that.node) &&
status == that.status &&
Objects.equals(transition, that.transition) &&
Objects.equals(nodeLog, that.nodeLog) &&
Objects.equals(error, that.error);
}

@Override
public int hashCode() {
return Objects.hash(task, node, status, transition, timestamp, nodeLog, error);
}

@Override
public String toString() {
return "EventLogEntry{" +
"task='" + task + '\'' +
", node='" + node + '\'' +
", status=" + status +
", transition='" + transition + '\'' +
", timestamp=" + timestamp +
", nodeLog=" + nodeLog +
", error=" + error +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,11 @@
import static io.knotx.fragments.api.FragmentResult.SUCCESS_TRANSITION;

import io.knotx.fragments.api.Fragment;
import io.vertx.codegen.annotations.DataObject;
import io.vertx.core.json.JsonObject;
import java.util.Objects;
import java.util.Optional;

@DataObject
public class FragmentEvent {

private static final String FRAGMENT_KEY = "fragment";
private static final String LOG_KEY = "log";
private static final String STATUS_KEY = "status";

private final EventLog log;
private Fragment fragment;
private Status status;
Expand All @@ -42,19 +35,6 @@ public FragmentEvent(Fragment fragment) {
this.status = Status.UNPROCESSED;
}

public FragmentEvent(JsonObject json) {
this.fragment = new Fragment(json.getJsonObject(FRAGMENT_KEY));
this.log = new EventLog(json.getJsonObject(LOG_KEY));
this.status = Status.valueOf(json.getString(STATUS_KEY));
}

public JsonObject toJson() {
return new JsonObject()
.put(FRAGMENT_KEY, fragment.toJson())
.put(LOG_KEY, log.toJson())
.put(STATUS_KEY, status);
}

public FragmentEvent log(EventLogEntry logEntry) {
log.append(logEntry);
return this;
Expand All @@ -69,10 +49,6 @@ public FragmentEvent setFragment(Fragment fragment) {
return this;
}

public JsonObject getLogAsJson() {
return log.toJson();
}

public EventLog getLog() {
return log;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,10 @@
package io.knotx.fragments.task.engine;

import io.knotx.server.api.context.ClientRequest;
import io.vertx.codegen.annotations.DataObject;
import io.vertx.core.json.JsonObject;
import java.util.Objects;

@DataObject
public class FragmentEventContext {

private static final String FRAGMENT_EVENT_KEY = "fragmentEvent";
private static final String CLIENT_REQUEST_KEY = "clientRequest";

private final FragmentEvent fragmentEvent;
private final ClientRequest clientRequest;

Expand All @@ -34,17 +28,6 @@ public FragmentEventContext(FragmentEvent fragmentEvent, ClientRequest clientReq
this.clientRequest = clientRequest;
}

public FragmentEventContext(JsonObject json) {
this.fragmentEvent = new FragmentEvent(json.getJsonObject(FRAGMENT_EVENT_KEY));
this.clientRequest = new ClientRequest(json.getJsonObject(CLIENT_REQUEST_KEY));
}

public JsonObject toJson() {
return new JsonObject()
.put(FRAGMENT_EVENT_KEY, fragmentEvent.toJson())
.put(CLIENT_REQUEST_KEY, clientRequest.toJson());
}

public FragmentEvent getFragmentEvent() {
return fragmentEvent;
}
Expand Down
Loading

0 comments on commit ed59aed

Please sign in to comment.