diff --git a/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/EnrichPipeline.java b/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/EnrichPipeline.java index b9075725f..d03399034 100644 --- a/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/EnrichPipeline.java +++ b/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/EnrichPipeline.java @@ -41,9 +41,7 @@ public sealed interface Response { String scheduleId(); String tenantId(); } - public sealed interface Success extends Response {} - public record AnyMessage(String scheduleId, String tenantId) implements Success {} - public record LastMessage(String scheduleId, String tenantId) implements Success {} + public record Success(String scheduleId, String tenantId) implements Response {} public record Failure(Throwable exception, String scheduleId, String tenantId) implements Response {} @@ -125,12 +123,7 @@ private static Behavior initPipeline( indexWriterResponseWrapper.response(); if (response instanceof IndexWriterActor.Success) { - if (dataPayload.isLast()) { - replyTo.tell(new LastMessage(scheduleId, dataPayload.getTenantId())); - } - else { - replyTo.tell(new AnyMessage(scheduleId, dataPayload.getTenantId())); - } + replyTo.tell(new Success(scheduleId, dataPayload.getTenantId())); } else if (response instanceof IndexWriterActor.Failure) { replyTo.tell(new Failure( @@ -142,11 +135,10 @@ else if (response instanceof IndexWriterActor.Failure) { return Behaviors.stopped(); }) .onSignal(ChildFailed.class, childFailed -> { - replyTo.tell( - new Failure( - childFailed.cause(), - dataPayload.getTenantId(), - dataPayload.getScheduleId())); + replyTo.tell(new Failure( + childFailed.cause(), + dataPayload.getTenantId(), + dataPayload.getScheduleId())); return Behaviors.stopped(); }) @@ -234,7 +226,7 @@ else if (r instanceof EnrichItemSupervisor.Error) { logger.error( "behaviorOnError is REJECT, stop pipeline: " + enrichItemError.getId(), param.exception); - replyTo.tell(new AnyMessage(dataPayload.getScheduleId(), dataPayload.getTenantId())); + replyTo.tell(new Success(dataPayload.getScheduleId(), dataPayload.getTenantId())); return Behaviors.stopped(); }