Skip to content

Commit

Permalink
Issue #495: refactor EnrichPipeline Success response
Browse files Browse the repository at this point in the history
  • Loading branch information
mrk-vi committed Jun 28, 2023
1 parent ea3d10d commit f336ccc
Showing 1 changed file with 7 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ public sealed interface Response {
String scheduleId();
String tenantId();
}
public sealed interface Success extends Response {}
public record AnyMessage(String scheduleId, String tenantId) implements Success {}
public record LastMessage(String scheduleId, String tenantId) implements Success {}
public record Success(String scheduleId, String tenantId) implements Response {}

public record Failure(Throwable exception, String scheduleId, String tenantId) implements Response {}

Expand Down Expand Up @@ -125,12 +123,7 @@ private static Behavior<Command> initPipeline(
indexWriterResponseWrapper.response();

if (response instanceof IndexWriterActor.Success) {
if (dataPayload.isLast()) {
replyTo.tell(new LastMessage(scheduleId, dataPayload.getTenantId()));
}
else {
replyTo.tell(new AnyMessage(scheduleId, dataPayload.getTenantId()));
}
replyTo.tell(new Success(scheduleId, dataPayload.getTenantId()));
}
else if (response instanceof IndexWriterActor.Failure) {
replyTo.tell(new Failure(
Expand All @@ -142,11 +135,10 @@ else if (response instanceof IndexWriterActor.Failure) {
return Behaviors.stopped();
})
.onSignal(ChildFailed.class, childFailed -> {
replyTo.tell(
new Failure(
childFailed.cause(),
dataPayload.getTenantId(),
dataPayload.getScheduleId()));
replyTo.tell(new Failure(
childFailed.cause(),
dataPayload.getTenantId(),
dataPayload.getScheduleId()));

return Behaviors.stopped();
})
Expand Down Expand Up @@ -234,7 +226,7 @@ else if (r instanceof EnrichItemSupervisor.Error) {
logger.error(
"behaviorOnError is REJECT, stop pipeline: " + enrichItemError.getId(), param.exception);

replyTo.tell(new AnyMessage(dataPayload.getScheduleId(), dataPayload.getTenantId()));
replyTo.tell(new Success(dataPayload.getScheduleId(), dataPayload.getTenantId()));

return Behaviors.stopped();
}
Expand Down

0 comments on commit f336ccc

Please sign in to comment.