diff --git a/core/app/datasource/pom.xml b/core/app/datasource/pom.xml
index 0285a355b..f1a202aed 100644
--- a/core/app/datasource/pom.xml
+++ b/core/app/datasource/pom.xml
@@ -195,6 +195,10 @@
com.typesafe.akka
akka-cluster-typed_${scala.version}
+
+ com.typesafe.akka
+ akka-cluster-sharding-typed_${scala.version}
+
com.lightbend.akka.management
akka-management_${scala.version}
diff --git a/core/app/datasource/src/main/java/io/openk9/datasource/actor/ActorSystemConfig.java b/core/app/datasource/src/main/java/io/openk9/datasource/actor/ActorSystemConfig.java
index 065115787..f70ab554d 100644
--- a/core/app/datasource/src/main/java/io/openk9/datasource/actor/ActorSystemConfig.java
+++ b/core/app/datasource/src/main/java/io/openk9/datasource/actor/ActorSystemConfig.java
@@ -1,8 +1,15 @@
package io.openk9.datasource.actor;
+import akka.cluster.sharding.typed.javadsl.ClusterSharding;
+import akka.cluster.sharding.typed.javadsl.Entity;
import akka.cluster.typed.Cluster;
import akka.management.cluster.bootstrap.ClusterBootstrap;
import akka.management.javadsl.AkkaManagement;
+import io.openk9.datasource.model.ScheduleId;
+import io.openk9.datasource.pipeline.actor.Schedulation;
+import io.openk9.datasource.pipeline.actor.enrichitem.Token;
+import io.openk9.datasource.service.DatasourceService;
+import io.openk9.datasource.sql.TransactionInvoker;
import io.quarkus.arc.Priority;
import io.quarkus.arc.properties.IfBuildProperty;
import org.jboss.logging.Logger;
@@ -11,6 +18,7 @@
import javax.enterprise.context.Dependent;
import javax.enterprise.inject.Produces;
import javax.inject.Inject;
+import java.util.UUID;
@Dependent
public class ActorSystemConfig {
@@ -41,7 +49,41 @@ public ActorSystemInitializer cluster() {
};
}
+ @Produces
+ @ApplicationScoped
+ public ActorSystemInitializer clusterSharding() {
+ logger.info("init cluster sharding");
+ return actorSystem -> {
+ ClusterSharding clusterSharding = ClusterSharding.get(actorSystem);
+
+ clusterSharding.init(Entity.of(Schedulation.ENTITY_TYPE_KEY, entityCtx -> {
+ String entityId = entityCtx.getEntityId();
+ String[] strings = entityId.split("#");
+ Schedulation.SchedulationKey key =
+ new Schedulation.SchedulationKey(
+ strings[0],
+ new ScheduleId(UUID.fromString(strings[1])));
+ return Schedulation.create(key, transactionInvoker, datasourceService);
+ }));
+
+ clusterSharding.init(Entity.of(Token.ENTITY_TYPE_KEY, entityCtx -> {
+ String entityId = entityCtx.getEntityId();
+ String[] strings = entityId.split("#");
+ Schedulation.SchedulationKey key =
+ new Schedulation.SchedulationKey(
+ strings[0],
+ new ScheduleId(UUID.fromString(strings[1])));
+ return Token.create(key);
+ }));
+
+ };
+ }
+
@Inject
Logger logger;
+ @Inject
+ TransactionInvoker transactionInvoker;
+ @Inject
+ DatasourceService datasourceService;
}
diff --git a/core/app/datasource/src/main/java/io/openk9/datasource/actor/ActorSystemProvider.java b/core/app/datasource/src/main/java/io/openk9/datasource/actor/ActorSystemProvider.java
index 010732523..ca9ac9c02 100644
--- a/core/app/datasource/src/main/java/io/openk9/datasource/actor/ActorSystemProvider.java
+++ b/core/app/datasource/src/main/java/io/openk9/datasource/actor/ActorSystemProvider.java
@@ -1,11 +1,9 @@
package io.openk9.datasource.actor;
import akka.actor.typed.ActorSystem;
-import akka.cluster.typed.ClusterSingleton;
+import akka.actor.typed.javadsl.Behaviors;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
-import io.openk9.datasource.pipeline.actor.IngestionActor;
-import io.openk9.datasource.pipeline.actor.mapper.PipelineMapper;
import io.quarkus.runtime.Startup;
import org.eclipse.microprofile.config.inject.ConfigProperty;
@@ -25,7 +23,7 @@ void init() {
Config defaultConfig = ConfigFactory.load(clusterFile);
Config config = defaultConfig.withFallback(ConfigFactory.load());
- actorSystem = ActorSystem.create(IngestionActor.create(pipelineMapper), "datasource", config);
+ actorSystem = ActorSystem.create(Behaviors.empty(), "datasource", config);
for (ActorSystemInitializer actorSystemInitializer : actorSystemInitializerInstance) {
actorSystemInitializer.init(actorSystem);
@@ -42,16 +40,10 @@ public ActorSystem> getActorSystem() {
return actorSystem;
}
- public ClusterSingleton getClusterSingleton() {
- return ClusterSingleton.get(actorSystem);
- }
-
private ActorSystem> actorSystem;
@Inject
Instance actorSystemInitializerInstance;
- @Inject
- PipelineMapper pipelineMapper;
@ConfigProperty(name = "akka.cluster.file")
String clusterFile;
diff --git a/core/app/datasource/src/main/java/io/openk9/datasource/listener/Scheduler.java b/core/app/datasource/src/main/java/io/openk9/datasource/listener/JobTriggerer.java
similarity index 99%
rename from core/app/datasource/src/main/java/io/openk9/datasource/listener/Scheduler.java
rename to core/app/datasource/src/main/java/io/openk9/datasource/listener/JobTriggerer.java
index 4ff4b3a2f..31960f8e1 100644
--- a/core/app/datasource/src/main/java/io/openk9/datasource/listener/Scheduler.java
+++ b/core/app/datasource/src/main/java/io/openk9/datasource/listener/JobTriggerer.java
@@ -26,7 +26,7 @@
import java.util.List;
import java.util.UUID;
-public class Scheduler {
+public class JobTriggerer {
public sealed interface Command extends CborSerializable {}
public record ScheduleDatasource(String tenantName, long datasourceId, boolean schedulable, String cron) implements Command {}
diff --git a/core/app/datasource/src/main/java/io/openk9/datasource/listener/SchedulerInitializerActor.java b/core/app/datasource/src/main/java/io/openk9/datasource/listener/SchedulerInitializerActor.java
index cb0fba2e8..62a712b9c 100644
--- a/core/app/datasource/src/main/java/io/openk9/datasource/listener/SchedulerInitializerActor.java
+++ b/core/app/datasource/src/main/java/io/openk9/datasource/listener/SchedulerInitializerActor.java
@@ -14,23 +14,23 @@
public class SchedulerInitializerActor {
public void scheduleDataSource(String tenantName, long datasourceId, boolean schedulable, String cron) {
- getSchedulerRef().tell(new Scheduler.ScheduleDatasource(tenantName, datasourceId, schedulable, cron));
+ getSchedulerRef().tell(new JobTriggerer.ScheduleDatasource(tenantName, datasourceId, schedulable, cron));
}
public void unScheduleDataSource(String tenantName, long datasourceId) {
- getSchedulerRef().tell(new Scheduler.UnScheduleDatasource(tenantName, datasourceId));
+ getSchedulerRef().tell(new JobTriggerer.UnScheduleDatasource(tenantName, datasourceId));
}
public void triggerDataSource(
String tenantName, long datasourceId, boolean startFromFirst) {
- getSchedulerRef().tell(new Scheduler.TriggerDatasource(tenantName, datasourceId, startFromFirst));
+ getSchedulerRef().tell(new JobTriggerer.TriggerDatasource(tenantName, datasourceId, startFromFirst));
}
- private ActorRef getSchedulerRef() {
+ private ActorRef getSchedulerRef() {
return ClusterSingleton.get(actorSystemProvider.getActorSystem())
.init(
SingletonActor.of(
- Scheduler.create(
+ JobTriggerer.create(
httpPluginDriverClient, transactionInvoker
), "scheduler")
);
diff --git a/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/SchedulationKeyUtils.java b/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/SchedulationKeyUtils.java
new file mode 100644
index 000000000..3878638a3
--- /dev/null
+++ b/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/SchedulationKeyUtils.java
@@ -0,0 +1,24 @@
+package io.openk9.datasource.pipeline;
+
+import io.openk9.datasource.model.ScheduleId;
+import io.openk9.datasource.pipeline.actor.Schedulation;
+
+import java.util.UUID;
+
+public class SchedulationKeyUtils {
+
+ public static String getValue(Schedulation.SchedulationKey key) {
+ return getValue(key.tenantId(), key.scheduleId().getValue());
+ }
+
+ public static String getValue(String tenantId, String scheduleId) {
+ return tenantId + "#" + scheduleId;
+ }
+
+ public static Schedulation.SchedulationKey getKey(
+ String tenantId, String scheduleId) {
+
+ return new Schedulation.SchedulationKey(
+ tenantId, new ScheduleId(UUID.fromString(scheduleId)));
+ }
+}
diff --git a/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/EnrichPipeline.java b/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/EnrichPipeline.java
index c94adbd86..dd8319334 100644
--- a/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/EnrichPipeline.java
+++ b/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/EnrichPipeline.java
@@ -7,10 +7,10 @@
import akka.cluster.typed.ClusterSingleton;
import akka.cluster.typed.SingletonActor;
import io.openk9.common.util.collection.Collections;
+import io.openk9.datasource.model.Datasource;
import io.openk9.datasource.model.EnrichItem;
-import io.openk9.datasource.pipeline.actor.dto.GetDatasourceDTO;
-import io.openk9.datasource.pipeline.actor.dto.GetEnrichItemDTO;
-import io.openk9.datasource.pipeline.actor.dto.SchedulerDTO;
+import io.openk9.datasource.model.EnrichPipelineItem;
+import io.openk9.datasource.model.Scheduler;
import io.openk9.datasource.pipeline.actor.enrichitem.EnrichItemSupervisor;
import io.openk9.datasource.pipeline.actor.enrichitem.HttpSupervisor;
import io.openk9.datasource.processor.payload.DataPayload;
@@ -34,7 +34,7 @@ private record IndexWriterResponseWrapper(
IndexWriterActor.Response response) implements Command {}
private record EnrichItemSupervisorResponseWrapper(
EnrichItemSupervisor.Response response) implements Command {}
- private record EnrichItemError(GetEnrichItemDTO enrichItem, Throwable exception) implements Command {}
+ private record EnrichItemError(EnrichItem enrichItem, Throwable exception) implements Command {}
private record InternalResponseWrapper(byte[] jsonObject) implements Command {}
private record InternalError(String error) implements Command {}
public sealed interface Response {
@@ -48,9 +48,9 @@ public record LastMessage(String scheduleId, String tenantId) implements Success
public record Failure(Throwable exception, String scheduleId, String tenantId) implements Response {}
public static Behavior create(
- ActorRef supervisorActorRef,
+ Schedulation.SchedulationKey key,
ActorRef replyTo, DataPayload dataPayload,
- GetDatasourceDTO datasource, SchedulerDTO scheduler) {
+ Scheduler scheduler) {
return Behaviors.setup(ctx -> {
@@ -61,14 +61,20 @@ public static Behavior create(
IndexWriterActor.Response.class,
IndexWriterResponseWrapper::new);
+ Datasource datasource = scheduler.getDatasource();
+ io.openk9.datasource.model.EnrichPipeline enrichPipeline =
+ datasource.getEnrichPipeline();
log.info("start pipeline for datasource with id {}", datasource.getId());
+ ActorRef supervisorActorRef =
+ ctx.spawnAnonymous(HttpSupervisor.create(key));
+
return Behaviors.receive(Command.class)
.onMessageEquals(
Start.INSTANCE, () ->
initPipeline(
ctx, supervisorActorRef, responseActorRef, replyTo, dataPayload,
- scheduler, datasource.getEnrichItems())
+ scheduler, enrichPipeline.getEnrichPipelineItems())
)
.build();
});
@@ -79,11 +85,12 @@ private static Behavior initPipeline(
ActorRef supervisorActorRef,
ActorRef responseActorRef,
ActorRef replyTo, DataPayload dataPayload,
- SchedulerDTO scheduler,
- Set enrichPipelineItems) {
+ Scheduler scheduler, Set enrichPipelineItems) {
Logger logger = ctx.getLog();
+ String scheduleId = scheduler.getScheduleId().getValue();
+
if (enrichPipelineItems.isEmpty()) {
logger.info("pipeline is empty, start index writer");
@@ -114,16 +121,16 @@ private static Behavior initPipeline(
if (response instanceof IndexWriterActor.Success) {
if (dataPayload.isLast()) {
- replyTo.tell(new LastMessage(scheduler.getScheduleId(), dataPayload.getTenantId()));
+ replyTo.tell(new LastMessage(scheduleId, dataPayload.getTenantId()));
}
else {
- replyTo.tell(new AnyMessage(scheduler.getScheduleId(), dataPayload.getTenantId()));
+ replyTo.tell(new AnyMessage(scheduleId, dataPayload.getTenantId()));
}
}
else if (response instanceof IndexWriterActor.Failure) {
replyTo.tell(new Failure(
((IndexWriterActor.Failure) response).exception(),
- scheduler.getScheduleId(),
+ scheduleId,
dataPayload.getTenantId()));
}
@@ -133,15 +140,15 @@ else if (response instanceof IndexWriterActor.Failure) {
}
- GetEnrichItemDTO enrichItem = Collections.head(enrichPipelineItems);
- Set tail = Collections.tail(enrichPipelineItems);
+ EnrichPipelineItem enrichPipelineItem = Collections.head(enrichPipelineItems);
+ EnrichItem enrichItem = enrichPipelineItem.getEnrichItem();
+ Set tail = Collections.tail(enrichPipelineItems);
logger.info("start enrich for enrichItem with id {}", enrichItem.getId());
String jsonPath = enrichItem.getJsonPath();
- EnrichItem.BehaviorMergeType behaviorMergeType =
- EnrichItem.BehaviorMergeType.valueOf(enrichItem.getBehaviorMergeType());
+ EnrichItem.BehaviorMergeType behaviorMergeType = enrichItem.getBehaviorMergeType();
ActorRef enrichItemSupervisorRef =
ctx.spawnAnonymous(EnrichItemSupervisor.create(supervisorActorRef));
@@ -177,10 +184,9 @@ else if (r instanceof EnrichItemSupervisor.Error) {
return Behaviors.receive(Command.class)
.onMessage(EnrichItemError.class, param -> {
- GetEnrichItemDTO enrichItemError = param.enrichItem();
+ EnrichItem enrichItemError = param.enrichItem();
- EnrichItem.BehaviorOnError behaviorOnError =
- EnrichItem.BehaviorOnError.valueOf(enrichItem.getBehaviorOnError());
+ EnrichItem.BehaviorOnError behaviorOnError = enrichItemError.getBehaviorOnError();
switch (behaviorOnError) {
case SKIP -> {
diff --git a/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/IndexWriterActor.java b/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/IndexWriterActor.java
index 9dc76b507..ad43f8ca1 100644
--- a/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/IndexWriterActor.java
+++ b/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/IndexWriterActor.java
@@ -6,7 +6,7 @@
import akka.actor.typed.javadsl.Behaviors;
import io.openk9.datasource.events.DatasourceEvent;
import io.openk9.datasource.events.DatasourceEventBus;
-import io.openk9.datasource.pipeline.actor.dto.SchedulerDTO;
+import io.openk9.datasource.model.Scheduler;
import io.openk9.datasource.processor.payload.DataPayload;
import io.openk9.datasource.util.CborSerializable;
import io.vertx.core.buffer.Buffer;
@@ -35,9 +35,9 @@
public class IndexWriterActor {
public sealed interface Command extends CborSerializable {}
- public record Start(SchedulerDTO schedulerDTO, byte[] dataPayload, ActorRef replyTo)
+ public record Start(Scheduler scheduler, byte[] dataPayload, ActorRef replyTo)
implements Command {}
- private record SearchResponseCommand(SchedulerDTO schedulerDTO, DataPayload dataPayload, ActorRef replyTo, SearchResponse searchResponse, Exception exception) implements Command {}
+ private record SearchResponseCommand(Scheduler scheduler, DataPayload dataPayload, ActorRef replyTo, SearchResponse searchResponse, Exception exception) implements Command {}
private record BulkResponseCommand(ActorRef replyTo, BulkResponse bulkResponse, DataPayload dataPayload, Exception exception) implements Command {}
public sealed interface Response extends CborSerializable {}
public enum Success implements Response {INSTANCE}
@@ -65,7 +65,7 @@ private static Behavior initial(
Logger logger = ctx.getLog();
return Behaviors.receive(Command.class)
- .onMessage(Start.class, (start) -> onStart(ctx, restHighLevelClient, start.schedulerDTO, start.dataPayload, start.replyTo))
+ .onMessage(Start.class, (start) -> onStart(ctx, restHighLevelClient, start.scheduler, start.dataPayload, start.replyTo))
.onMessage(SearchResponseCommand.class, src -> onSearchResponseCommand(
ctx, restHighLevelClient, src, logger))
.onMessage(BulkResponseCommand.class, brc -> onBulkResponseCommand(logger, brc, eventBus))
@@ -136,12 +136,12 @@ private static Behavior onSearchResponseCommand(
SearchResponseCommand src, Logger logger) {
Exception exception = src.exception;
- SchedulerDTO schedulerDTO = src.schedulerDTO;
+ Scheduler scheduler = src.scheduler;
DataPayload dataPayload = src.dataPayload;
SearchResponse searchResponse = src.searchResponse;
ActorRef replyTo = src.replyTo;
- String oldDataIndexName = schedulerDTO.getOldDataIndexName();
- String newDataIndexName = schedulerDTO.getNewDataIndexName();
+ String oldDataIndexName = null;
+ String newDataIndexName = null;
if (exception != null) {
logger.error("Error on search", exception);
@@ -150,6 +150,13 @@ private static Behavior onSearchResponseCommand(
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
+ if (scheduler.getOldDataIndex() != null) {
+ oldDataIndexName = scheduler.getOldDataIndex().getName();
+ }
+ if (scheduler.getNewDataIndex() != null) {
+ newDataIndexName = scheduler.getNewDataIndex().getName();
+ }
+
if (oldDataIndexName != null) {
bulkRequest.add(createDocWriteRequest(ctx, oldDataIndexName, dataPayload, logger, searchResponse));
}
@@ -180,14 +187,18 @@ public void onFailure(Exception e) {
private static Behavior onStart(
ActorContext ctx, RestHighLevelClient restHighLevelClient,
- SchedulerDTO schedulerDTO, byte[] data,
+ Scheduler scheduler, byte[] data,
ActorRef replyTo) {
DataPayload dataPayload = Json.decodeValue(Buffer.buffer(data), DataPayload.class);
ctx.getLog().info("index writer start for content: " + dataPayload.getContentId());
- String oldDataIndexName = schedulerDTO.getOldDataIndexName();
+ String oldDataIndexName = null;
+
+ if (scheduler.getOldDataIndex() != null) {
+ oldDataIndexName = scheduler.getOldDataIndex().getName();
+ }
if (oldDataIndexName != null) {
SearchRequest searchRequest = new SearchRequest(oldDataIndexName);
@@ -209,21 +220,21 @@ private static Behavior onStart(
public void onResponse(SearchResponse searchResponse) {
ctx.getSelf().tell(
new SearchResponseCommand(
- schedulerDTO, dataPayload, replyTo, searchResponse, null));
+ scheduler, dataPayload, replyTo, searchResponse, null));
}
@Override
public void onFailure(Exception e) {
ctx.getSelf().tell(
new SearchResponseCommand(
- schedulerDTO, dataPayload, replyTo, null, e));
+ scheduler, dataPayload, replyTo, null, e));
}
});
}
else {
ctx.getSelf().tell(
new SearchResponseCommand(
- schedulerDTO, dataPayload, replyTo, null, null));
+ scheduler, dataPayload, replyTo, null, null));
}
return Behaviors.same();
}
diff --git a/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/IngestionActor.java b/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/IngestionActor.java
deleted file mode 100644
index ba00d33bd..000000000
--- a/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/IngestionActor.java
+++ /dev/null
@@ -1,402 +0,0 @@
-package io.openk9.datasource.pipeline.actor;
-
-import akka.actor.typed.ActorRef;
-import akka.actor.typed.Behavior;
-import akka.actor.typed.PostStop;
-import akka.actor.typed.PreRestart;
-import akka.actor.typed.SupervisorStrategy;
-import akka.actor.typed.javadsl.ActorContext;
-import akka.actor.typed.javadsl.Behaviors;
-import akka.cluster.typed.ClusterSingleton;
-import akka.cluster.typed.SingletonActor;
-import io.openk9.common.util.VertxUtil;
-import io.openk9.datasource.model.EnrichItem;
-import io.openk9.datasource.model.ScheduleId;
-import io.openk9.datasource.model.Scheduler;
-import io.openk9.datasource.pipeline.SchedulerManager;
-import io.openk9.datasource.pipeline.actor.dto.GetDatasourceDTO;
-import io.openk9.datasource.pipeline.actor.dto.GetEnrichItemDTO;
-import io.openk9.datasource.pipeline.actor.dto.SchedulerDTO;
-import io.openk9.datasource.pipeline.actor.enrichitem.EnrichItemSupervisor;
-import io.openk9.datasource.pipeline.actor.enrichitem.HttpSupervisor;
-import io.openk9.datasource.pipeline.actor.mapper.PipelineMapper;
-import io.openk9.datasource.processor.payload.DataPayload;
-import io.openk9.datasource.sql.TransactionInvoker;
-import io.vertx.core.json.JsonObject;
-import org.eclipse.microprofile.reactive.messaging.Message;
-
-import javax.enterprise.inject.spi.CDI;
-import java.time.Duration;
-import java.time.Instant;
-import java.time.LocalDateTime;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-public class IngestionActor {
- public sealed interface Command {}
- public record IngestionMessage(DataPayload dataPayload, Message> message) implements Command {}
- public record Callback(String tokenId, byte[] body) implements Command {}
- private record DatasourceResponseWrapper(Message> message, DataPayload dataPayload, Datasource.Response response) implements Command {}
- private record EnrichItemResponseWrapper(EnrichItemActor.EnrichItemCallbackResponse response, Map datasourcePayload, ActorRef replyTo) implements Command {}
- private record SupervisorResponseWrapper(EnrichItemSupervisor.Response response, ActorRef replyTo) implements Command {}
- public record EnrichItemCallback(long enrichItemId, String tenantId, Map datasourcePayload, ActorRef replyTo) implements Command {}
- private record InitPipeline(Message> message, DataPayload dataPayload, GetDatasourceDTO datasource) implements Command {}
- private record CreateEnrichPipeline(Message> message, DataPayload dataPayload, GetDatasourceDTO datasource, SchedulerDTO scheduler) implements Command {}
- private record EnrichPipelineResponseWrapper(Message> message, EnrichPipeline.Response response) implements Command {}
- public sealed interface Response {}
- public record EnrichItemCallbackResponse(byte[] jsonObject) implements Response {}
- public record EnrichItemCallbackError(String message) implements Response {}
-
- public static Behavior create(PipelineMapper pipelineMapper) {
- return Behaviors
- .supervise(Behaviors.setup(ctx -> {
-
- ActorRef supervisorActorRef =
- ctx.spawn(
- HttpSupervisor.create(),
- "enrich-pipeline-supervisor");
-
- ActorRef enrichItemActorRef =
- ctx.spawn(
- EnrichItemActor.create(),
- "enrich-item-actor");
-
- ClusterSingleton clusterSingleton = ClusterSingleton.get(ctx.getSystem());
-
- ActorRef datasourceActorRef =
- clusterSingleton.init(SingletonActor.of(Datasource.create(pipelineMapper), "datasource"));
-
- TransactionInvoker transactionInvoker = CDI.current().select(TransactionInvoker.class).get();
-
- ActorRef schedulerManagerActorRef =
- ctx.spawn(SchedulerManager.create(transactionInvoker, datasourceActorRef), "scheduler-manager");
-
- return initial(
- ctx, transactionInvoker, datasourceActorRef, pipelineMapper,
- supervisorActorRef, enrichItemActorRef, schedulerManagerActorRef, new ArrayList<>());
- }))
- .onFailure(SupervisorStrategy.restartWithBackoff(
- Duration.ofMillis(500), Duration.ofSeconds(5), 0.1));
- }
-
- private static Behavior initial(
- ActorContext ctx,
- TransactionInvoker transactionInvoker,
- ActorRef datasourceActorRef,
- PipelineMapper pipelineMapper,
- ActorRef supervisorActorRef,
- ActorRef enrichItemActorRef,
- ActorRef schedulerManagerActorRef,
- List> messages) {
-
- return Behaviors.receive(Command.class)
- .onMessage(IngestionMessage.class, ingestionMessage -> {
-
- DataPayload dataPayload = ingestionMessage.dataPayload;
- Message> message = ingestionMessage.message;
- String ingestionId = dataPayload.getIngestionId();
-
- String contentId = dataPayload.getContentId();
-
- if (contentId != null) {
-
- ctx.getLog().info(
- "read message ingestionId: {}, contentId: {}",
- ingestionId, contentId);
-
- ActorRef responseActorRef =
- ctx.messageAdapter(
- Datasource.Response.class,
- response -> new DatasourceResponseWrapper(message, dataPayload, response)
- );
-
- datasourceActorRef.tell(new Datasource.GetDatasource(
- dataPayload.getTenantId(), dataPayload.getDatasourceId(), dataPayload.getParsingDate(),
- responseActorRef));
-
- List> newMessages = new ArrayList<>(messages);
- newMessages.add(message);
-
- return initial(
- ctx, transactionInvoker, datasourceActorRef, pipelineMapper, supervisorActorRef, enrichItemActorRef, schedulerManagerActorRef, newMessages);
-
- }
- else {
-
- ctx
- .getLog()
- .info(
- "contentId is null for scheduleId {} and tenantId {}",
- dataPayload.getScheduleId(), dataPayload.getTenantId());
-
- schedulerManagerActorRef.tell(
- new SchedulerManager.LastMessage(
- dataPayload.getScheduleId(),
- dataPayload.getTenantId()));
-
- message.ack();
-
- return Behaviors.same();
-
- }
-
-
- })
- .onMessage(DatasourceResponseWrapper.class, drw -> {
-
- Datasource.Response response = drw.response;
- Message> message = drw.message;
- DataPayload dataPayload = drw.dataPayload;
-
- if (response instanceof Datasource.GetDatasourceSuccess) {
- GetDatasourceDTO getDatasourceDTO = ((Datasource.GetDatasourceSuccess) response).datasource();
- ctx.getSelf().tell(new InitPipeline(message, dataPayload, getDatasourceDTO));
- }
- else if (response instanceof Datasource.Failure) {
- Throwable exception = ((Datasource.Failure) response).exception();
- ctx.getLog().error("get datasource failure, nack message", exception);
- message.nack(exception);
- List> newMessages = new ArrayList<>(messages);
- newMessages.remove(message);
-
- return initial(
- ctx, transactionInvoker, datasourceActorRef, pipelineMapper,
- supervisorActorRef, enrichItemActorRef, schedulerManagerActorRef, newMessages);
- }
- return Behaviors.same();
-
- })
- .onMessage(InitPipeline.class, ip -> onInitPipeline(ctx, transactionInvoker, pipelineMapper, ip))
- .onMessage(CreateEnrichPipeline.class, cep -> onCreateEnrichPipeline(ctx, supervisorActorRef, cep))
- .onMessage(EnrichPipelineResponseWrapper.class, eprw ->
- onEnrichPipelineResponseWrapper(
- ctx, transactionInvoker, datasourceActorRef, pipelineMapper, eprw,
- supervisorActorRef, enrichItemActorRef, schedulerManagerActorRef, messages))
- .onMessage(Callback.class, callback -> {
-
- ctx.getLog().info("callback with tokenId: {}", callback.tokenId());
-
- supervisorActorRef.tell(
- new HttpSupervisor.Callback(
- callback.tokenId(), callback.body()));
-
- return Behaviors.same();
-
- })
- .onMessage(EnrichItemResponseWrapper.class, eirw -> onEnrichItemResponseWrapper(ctx, pipelineMapper, supervisorActorRef, eirw))
- .onMessage(EnrichItemCallback.class, eic -> onEnrichItemCallback(ctx, enrichItemActorRef, eic))
- .onMessage(SupervisorResponseWrapper.class, srw -> onSupervisorResponseWrapper(ctx, srw))
- .onSignal(PreRestart.class, signal -> onSignal(ctx, "ingestion actor restarting", messages))
- .onSignal(PostStop.class, signal -> onSignal(ctx, "ingestion actor stopped", messages))
- .build();
- }
-
- private static Behavior onEnrichPipelineResponseWrapper(
- ActorContext ctx, TransactionInvoker transactionInvoker,
- ActorRef datasourceActorRef,
- PipelineMapper pipelineMapper, EnrichPipelineResponseWrapper eprw,
- ActorRef supervisorActorRef,
- ActorRef enrichItemActorRef,
- ActorRef schedulerManagerActorRef,
- List> messages) {
-
- EnrichPipeline.Response response = eprw.response;
- Message> message = eprw.message;
-
- schedulerManagerActorRef.tell(new SchedulerManager.Ping(response.scheduleId(), response.tenantId()));
-
- if (response instanceof EnrichPipeline.Success) {
- ctx.getLog().info("enrich pipeline success, ack message");
- if (response instanceof EnrichPipeline.LastMessage) {
- ctx.getLog().info("last message processed on pipeline");
-
- EnrichPipeline.LastMessage lastMessage = (EnrichPipeline.LastMessage) response;
-
- schedulerManagerActorRef.tell(new SchedulerManager.LastMessage(lastMessage.scheduleId(), lastMessage.tenantId()));
- }
- message.ack();
- }
- else if (response instanceof EnrichPipeline.Failure) {
- Throwable exception =
- ((EnrichPipeline.Failure) response).exception();
- ctx.getLog().error(
- "enrich pipeline failure, nack message", exception);
- message.nack(exception);
- }
-
- List> newMessages = new ArrayList<>(messages);
- newMessages.remove(message);
-
- return initial(
- ctx, transactionInvoker, datasourceActorRef, pipelineMapper, supervisorActorRef,
- enrichItemActorRef, schedulerManagerActorRef, newMessages);
- }
-
- private static Behavior onInitPipeline(
- ActorContext ctx, TransactionInvoker transactionInvoker, PipelineMapper pipelineMapper, InitPipeline initPipeline) {
-
- Message> message = initPipeline.message;
- DataPayload dataPayload = initPipeline.dataPayload;
- GetDatasourceDTO datasource = initPipeline.datasource;
-
- ScheduleId scheduleId = new ScheduleId(UUID.fromString(dataPayload.getScheduleId()));
- VertxUtil.runOnContext(() -> transactionInvoker
- .withStatelessTransaction(dataPayload.getTenantId(), s -> s
- .createQuery("select s " +
- "from Scheduler s " +
- "join fetch s.datasource " +
- "left join fetch s.oldDataIndex " +
- "left join fetch s.newDataIndex " +
- "where s.scheduleId = :scheduleId", Scheduler.class)
- .setParameter("scheduleId", scheduleId)
- .getSingleResult()
- .map(pipelineMapper::map)
- .invoke(scheduler ->
- ctx.getSelf().tell(new CreateEnrichPipeline(message, dataPayload, datasource, scheduler))
- )
- )
- );
-
- return Behaviors.same();
- }
-
- private static Behavior onCreateEnrichPipeline(
- ActorContext ctx, ActorRef supervisorActorRef,
- CreateEnrichPipeline cep) {
-
- Message> message = cep.message;
- DataPayload dataPayload = cep.dataPayload;
- GetDatasourceDTO datasource = cep.datasource;
- SchedulerDTO scheduler = cep.scheduler;
-
- String newDataIndexName = scheduler.getNewDataIndexName();
-
- if (newDataIndexName == null) {
- newDataIndexName = scheduler.getOldDataIndexName();
- }
-
- dataPayload.setIndexName(newDataIndexName);
-
- ActorRef responseActorRef =
- ctx.messageAdapter(EnrichPipeline.Response.class, response ->
- new EnrichPipelineResponseWrapper(message, response));
-
- ActorRef enrichPipelineActorRef = ctx.spawn(
- EnrichPipeline.create(
- supervisorActorRef, responseActorRef, dataPayload, datasource, scheduler),
- "enrich-pipeline");
-
- enrichPipelineActorRef.tell(EnrichPipeline.Start.INSTANCE);
-
- return Behaviors.same();
- }
-
- private static Behavior onSupervisorResponseWrapper(
- ActorContext ctx, SupervisorResponseWrapper srw) {
-
- EnrichItemSupervisor.Response response = srw.response;
- ActorRef replyTo = srw.replyTo;
-
- if (response instanceof EnrichItemSupervisor.Body) {
- replyTo.tell(new EnrichItemCallbackResponse(((EnrichItemSupervisor.Body)response).body()));
- }
- else {
- replyTo.tell(new EnrichItemCallbackError(((EnrichItemSupervisor.Error)response).error()));
- }
-
- return Behaviors.same();
-
- }
-
- private static Behavior onEnrichItemResponseWrapper(
- ActorContext ctx,
- PipelineMapper pipelineMapper,
- ActorRef supervisorActorRef,
- EnrichItemResponseWrapper eirw) {
-
- ActorRef replyTo = eirw.replyTo;
- EnrichItemActor.EnrichItemCallbackResponse response = eirw.response;
- Map datasourcePayload = eirw.datasourcePayload;
-
- EnrichItem enrichItemEntity = response.enrichItem();
- GetEnrichItemDTO enrichItem = pipelineMapper.map(enrichItemEntity);
-
- JsonObject datasourcePayloadJson = new JsonObject(datasourcePayload);
-
- DataPayload dataPayload =
- DataPayload
- .builder()
- .acl(Map.of())
- .rawContent(datasourcePayloadJson.toString())
- .parsingDate(Instant.now().toEpochMilli())
- .documentTypes(
- datasourcePayloadJson
- .fieldNames()
- .toArray(new String[0])
- )
- .rest(datasourcePayload)
- .build();
-
- ActorRef responseActorRef =
- ctx.messageAdapter(
- EnrichItemSupervisor.Response.class,
- param -> new SupervisorResponseWrapper(param, replyTo)
- );
-
- ActorRef enrichItemActorRef =
- ctx.spawnAnonymous(EnrichItemSupervisor.create(supervisorActorRef));
-
- Long requestTimeout = enrichItem.getRequestTimeout();
-
- requestTimeout = requestTimeout != null && requestTimeout > 0 ? requestTimeout : 30_000;
-
- LocalDateTime expiredDate =
- LocalDateTime.now().plusSeconds(requestTimeout);
-
- enrichItemActorRef.tell(
- new EnrichItemSupervisor.Execute(
- enrichItem, dataPayload, expiredDate, responseActorRef
- )
- );
-
- return Behaviors.same();
- }
-
- private static Behavior onEnrichItemCallback(
- ActorContext ctx,
- ActorRef enrichItemActorRef,
- EnrichItemCallback eic) {
-
- Map datasourcePayload = eic.datasourcePayload;
-
- ActorRef enrichItemCallbackResponseActorRef =
- ctx.messageAdapter(
- EnrichItemActor.EnrichItemCallbackResponse.class,
- param -> new EnrichItemResponseWrapper(param, datasourcePayload, eic.replyTo)
- );
-
- enrichItemActorRef.tell(
- new EnrichItemActor.EnrichItemCallback(
- eic.enrichItemId, eic.tenantId, enrichItemCallbackResponseActorRef));
-
- return Behaviors.same();
-
- }
-
- private static Behavior onSignal(
- ActorContext ctx, String signalMessage, List> messages) {
-
- ctx.getLog().error(signalMessage);
-
- RuntimeException runtimeException = new RuntimeException(signalMessage);
-
- for (Message> message : messages) {
- message.nack(runtimeException);
- }
-
- return Behaviors.same();
- }
-
-}
diff --git a/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/IngestionActorSystem.java b/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/IngestionActorSystem.java
deleted file mode 100644
index d0556ee77..000000000
--- a/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/IngestionActorSystem.java
+++ /dev/null
@@ -1,70 +0,0 @@
-package io.openk9.datasource.pipeline.actor;
-
-import akka.actor.typed.ActorRef;
-import akka.actor.typed.ActorSystem;
-import akka.actor.typed.javadsl.AskPattern;
-import io.openk9.datasource.actor.ActorSystemProvider;
-import io.openk9.datasource.processor.payload.DataPayload;
-import io.smallrye.mutiny.Uni;
-import io.vertx.core.json.JsonObject;
-import org.eclipse.microprofile.reactive.messaging.Message;
-
-import javax.annotation.PostConstruct;
-import javax.enterprise.context.ApplicationScoped;
-import javax.inject.Inject;
-import java.time.Duration;
-import java.util.Map;
-import java.util.concurrent.CompletionStage;
-
-@ApplicationScoped
-public class IngestionActorSystem {
-
- @PostConstruct
- public void init() {
- this.actorSystem =
- (ActorSystem)
- actorSystemProvider.getActorSystem();
- }
-
- public void startEnrichPipeline(DataPayload dataPayload, Message> message) {
- this.actorSystem.tell(new IngestionActor.IngestionMessage(dataPayload, message));
- }
-
- public void callback(String tokenId, JsonObject body) {
- actorSystem.tell(new IngestionActor.Callback(tokenId, body.toBuffer().getBytes()));
- }
-
- public Uni callEnrichItem(
- long enrichItemId, String tenantId, Map datasourcePayload) {
-
- CompletionStage future =
- AskPattern.ask(
- actorSystem,
- (ActorRef replyTo) ->
- new IngestionActor.EnrichItemCallback(enrichItemId, tenantId, datasourcePayload, replyTo),
- Duration.ofSeconds(30),
- actorSystem.scheduler());
-
- return Uni.createFrom()
- .completionStage(future)
- .map(response -> {
- if (response instanceof IngestionActor.EnrichItemCallbackResponse) {
-
- byte[] bytes =
- ((IngestionActor.EnrichItemCallbackResponse) response).jsonObject();
-
- return new JsonObject(new String(bytes));
- }
- else {
- return JsonObject.of("error", ((IngestionActor.EnrichItemCallbackError) response).message());
- }
- });
-
- }
-
- private ActorSystem actorSystem;
-
- @Inject
- ActorSystemProvider actorSystemProvider;
-
-}
diff --git a/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/Schedulation.java b/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/Schedulation.java
new file mode 100644
index 000000000..7cdb94bad
--- /dev/null
+++ b/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/Schedulation.java
@@ -0,0 +1,252 @@
+package io.openk9.datasource.pipeline.actor;
+
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.Behavior;
+import akka.actor.typed.javadsl.AbstractBehavior;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
+import io.openk9.common.util.VertxUtil;
+import io.openk9.datasource.model.DataIndex;
+import io.openk9.datasource.model.ScheduleId;
+import io.openk9.datasource.model.Scheduler;
+import io.openk9.datasource.model.Scheduler_;
+import io.openk9.datasource.pipeline.SchedulationKeyUtils;
+import io.openk9.datasource.processor.payload.DataPayload;
+import io.openk9.datasource.service.DatasourceService;
+import io.openk9.datasource.sql.TransactionInvoker;
+import io.openk9.datasource.util.CborSerializable;
+import io.quarkus.runtime.util.ExceptionUtil;
+
+import javax.persistence.criteria.CriteriaUpdate;
+import java.util.ArrayDeque;
+import java.util.Deque;
+
+public class Schedulation extends AbstractBehavior {
+
+ public static final EntityTypeKey ENTITY_TYPE_KEY =
+ EntityTypeKey.create(Command.class, "schedulation");
+
+ public sealed interface Command extends CborSerializable {}
+ public enum Start implements Command {INSTANCE}
+ public enum SetDataIndex implements Command {INSTANCE}
+ public enum SetStatusFinished implements Command {INSTANCE}
+ public record Ingest(DataPayload payload, ActorRef replyTo) implements Command {}
+ private record SetScheduler(Scheduler scheduler) implements Command {}
+ private record EnrichPipelineResponseWrapper(EnrichPipeline.Response response) implements Command {}
+
+ public sealed interface Response extends CborSerializable {}
+ public record Success() implements Response {}
+ public record Failure(String error) implements Response {}
+
+ public record SchedulationKey(String tenantId, ScheduleId scheduleId) {
+ public String value() {
+ return SchedulationKeyUtils.getValue(this);
+ }
+ }
+
+ private final SchedulationKey key;
+ private final TransactionInvoker txInvoker;
+ private final DatasourceService datasourceService;
+ private final Deque lag = new ArrayDeque<>();
+ private Ingest currentIngest;
+ private Scheduler scheduler;
+
+ public Schedulation(
+ ActorContext context,
+ SchedulationKey key,
+ TransactionInvoker txInvoker,
+ DatasourceService datasourceService) {
+
+ super(context);
+ this.key = key;
+ this.txInvoker = txInvoker;
+ this.datasourceService = datasourceService;
+ context.getSelf().tell(Start.INSTANCE);
+ }
+
+ public static Behavior create(
+ SchedulationKey schedulationKey, TransactionInvoker txInvoker,
+ DatasourceService datasourceService) {
+
+ return Behaviors.setup(ctx -> new Schedulation(
+ ctx, schedulationKey, txInvoker, datasourceService));
+ }
+
+
+ @Override
+ public Receive createReceive() {
+ return init();
+ }
+
+ private Receive init() {
+ return newReceiveBuilder()
+ .onMessageEquals(Start.INSTANCE, this::onStart)
+ .onMessage(SetScheduler.class, this::onSetScheduler)
+ .build();
+ }
+
+ private Receive ready() {
+ return newReceiveBuilder()
+ .onMessage(Ingest.class, this::onIngestReady)
+ .build();
+ }
+
+ private Receive busy() {
+ return newReceiveBuilder()
+ .onMessage(
+ EnrichPipelineResponseWrapper.class, this::onEnrichPipelineResponse)
+ .onAnyMessage(this::onBusy)
+ .build();
+ }
+
+ private Receive finish() {
+ return newReceiveBuilder()
+ .onMessageEquals(SetDataIndex.INSTANCE, this::onSetDataIndex)
+ .onMessageEquals(SetStatusFinished.INSTANCE, this::onSetStatusFinished)
+ .build();
+ }
+
+ private Behavior next() {
+ if (currentIngest.payload.isLast()) {
+ getContext().getSelf().tell(SetDataIndex.INSTANCE);
+ return finish();
+ }
+
+ currentIngest = null;
+
+ if (!lag.isEmpty()) {
+ Command command = lag.pop();
+ getContext().getSelf().tell(command);
+ }
+
+ return ready();
+ }
+
+ private Behavior onStart() {
+ VertxUtil.runOnContext(() -> txInvoker
+ .withStatelessTransaction(key.tenantId, s -> s
+ .createQuery("select s " +
+ "from Scheduler s " +
+ "join fetch s.datasource d" +
+ "left join fetch d.enrichPipeline ep " +
+ "left join fetch ep.enrichPipelineItems epi " +
+ "left join fetch epi.enrichItem " +
+ "left join fetch s.oldDataIndex " +
+ "left join fetch s.newDataIndex " +
+ "where s.scheduleId = :scheduleId", Scheduler.class)
+ .setParameter("scheduleId", key.scheduleId)
+ .getSingleResult()
+ .invoke(scheduler -> getContext().getSelf().tell(new SetScheduler(scheduler)))
+ )
+ );
+ return Behaviors.same();
+ }
+
+ private Behavior onSetScheduler(SetScheduler setScheduler) {
+ this.scheduler = setScheduler.scheduler;
+ return this.ready();
+ }
+
+ private Behavior onIngestReady(Ingest ingest) {
+ this.currentIngest = ingest;
+
+ String indexName = getIndexName();
+
+ DataPayload dataPayload = ingest.payload;
+
+ dataPayload.setIndexName(indexName);
+
+ ActorRef responseActorRef = getContext()
+ .messageAdapter(EnrichPipeline.Response.class, EnrichPipelineResponseWrapper::new);
+
+ ActorRef enrichPipelineActorRef = getContext().spawnAnonymous(
+ EnrichPipeline.create(key, responseActorRef, dataPayload, scheduler));
+
+ enrichPipelineActorRef.tell(EnrichPipeline.Start.INSTANCE);
+
+ return this.busy();
+ }
+
+ private Behavior onBusy(Command ingest) {
+ this.lag.add(ingest);
+ getContext().getLog().info("There are {} commands waiting", lag.size());
+ return Behaviors.same();
+ }
+
+ private Behavior onEnrichPipelineResponse(EnrichPipelineResponseWrapper eprw) {
+ EnrichPipeline.Response response = eprw.response;
+
+ if (response instanceof EnrichPipeline.Success) {
+ getContext().getLog().info("enrich pipeline success");
+ currentIngest.replyTo.tell(new Success());
+ }
+ else if (response instanceof EnrichPipeline.Failure) {
+ Throwable exception = ((EnrichPipeline.Failure) response).exception();
+ getContext().getLog().error("enrich pipeline failure", exception);
+ currentIngest.replyTo.tell(new Failure(ExceptionUtil.generateStackTrace(exception)));
+ }
+
+ return next();
+ }
+
+ private Behavior onSetDataIndex() {
+ DataIndex newDataIndex = scheduler.getNewDataIndex();
+ io.openk9.datasource.model.Datasource datasource = scheduler.getDatasource();
+
+ if (newDataIndex != null) {
+ Long newDataIndexId = newDataIndex.getId();
+ Long datasourceId = datasource.getId();
+ String tenantId = key.tenantId;
+
+ getContext().getLog().info(
+ "replacing dataindex {} for datasource {} on tenant {}",
+ newDataIndexId, datasourceId, tenantId);
+ VertxUtil.runOnContext(() -> txInvoker
+ .withTransaction(
+ tenantId,
+ s -> datasourceService.setDataIndex(datasourceId, newDataIndexId)
+ )
+ .invoke(() -> getContext().getSelf().tell(SetStatusFinished.INSTANCE))
+ );
+ }
+ else {
+ getContext().getSelf().tell(SetStatusFinished.INSTANCE);
+ }
+
+ return Behaviors.same();
+ }
+
+ private Behavior onSetStatusFinished() {
+ VertxUtil.runOnContext(() -> txInvoker.withTransaction(
+ key.tenantId,
+ s -> {
+ CriteriaUpdate criteriaUpdate =
+ txInvoker.getCriteriaBuilder().createCriteriaUpdate(Scheduler.class);
+ criteriaUpdate.from(Scheduler.class);
+ criteriaUpdate.set(Scheduler_.status, Scheduler.SchedulerStatus.FINISHED);
+ return s.createQuery(criteriaUpdate).executeUpdate();
+ })
+ );
+
+ getContext().getLog().info("Stopping " + key);
+
+ return Behaviors.stopped();
+
+ }
+
+ private String getIndexName() {
+ String indexName = null;
+ DataIndex newDataIndex = scheduler.getNewDataIndex();
+
+ if (newDataIndex != null) {
+ indexName = newDataIndex.getName();
+ }
+
+ if (indexName == null) {
+ indexName = scheduler.getOldDataIndex().getName();
+ }
+ return indexName;
+ }
+}
diff --git a/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/enrichitem/EnrichItemSupervisor.java b/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/enrichitem/EnrichItemSupervisor.java
index d88ff76ec..2318f4ac3 100644
--- a/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/enrichitem/EnrichItemSupervisor.java
+++ b/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/enrichitem/EnrichItemSupervisor.java
@@ -5,7 +5,6 @@
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import io.openk9.datasource.model.EnrichItem;
-import io.openk9.datasource.pipeline.actor.dto.GetEnrichItemDTO;
import io.openk9.datasource.processor.payload.DataPayload;
import io.vertx.core.json.JsonObject;
@@ -15,12 +14,12 @@ public class EnrichItemSupervisor {
public sealed interface Command {}
public record Execute(
- GetEnrichItemDTO enrichItem, DataPayload dataPayload,
+ EnrichItem enrichItem, DataPayload dataPayload,
LocalDateTime expiredDate, ActorRef replyTo) implements Command {}
private record HttpSupervisorWrapper(HttpSupervisor.Response response, ActorRef replyTo) implements Command {}
private record GroovySupervisorWrapper(GroovyActor.Response response, ActorRef replyTo) implements Command {}
private record GroovyValidatorWrapper(
- GroovyActor.Response response, GetEnrichItemDTO enrichItem,
+ GroovyActor.Response response, EnrichItem enrichItem,
JsonObject dataPayload, LocalDateTime expiredDate, ActorRef replyTo) implements Command {}
public sealed interface Response {}
public record Body(byte[] body) implements Response {}
@@ -47,7 +46,7 @@ private static Behavior onGroovyValidatorResponse(
ActorContext ctx) {
GroovyActor.Response response = gvw.response;
- GetEnrichItemDTO enrichItem = gvw.enrichItem;
+ EnrichItem enrichItem = gvw.enrichItem;
JsonObject dataPayload = gvw.dataPayload;
if (response instanceof GroovyActor.GroovyValidateResponse) {
@@ -62,7 +61,7 @@ private static Behavior onGroovyValidatorResponse(
hsr -> new HttpSupervisorWrapper(hsr, gvw.replyTo)
);
- EnrichItem.EnrichItemType type = EnrichItem.EnrichItemType.valueOf(enrichItem.getType());
+ EnrichItem.EnrichItemType type = enrichItem.getType();
httpSupervisor.tell(
new HttpSupervisor.Call(
@@ -128,7 +127,7 @@ private static Behavior onExecute(
ActorRef httpSupervisor,
Execute execute, ActorContext ctx) {
- GetEnrichItemDTO enrichItem = execute.enrichItem;
+ EnrichItem enrichItem = execute.enrichItem;
DataPayload dataPayload = execute.dataPayload;
ActorRef replyTo = execute.replyTo;
LocalDateTime expiredDate = execute.expiredDate;
@@ -148,7 +147,7 @@ private static Behavior onExecute(
"enrichItemConfig", enrichItemConfig
);
- switch (EnrichItem.EnrichItemType.valueOf(enrichItem.getType())) {
+ switch (enrichItem.getType()) {
case HTTP_ASYNC, HTTP_SYNC -> onHttpEnrichItem(enrichItem, payload, replyTo, httpSupervisor, expiredDate, ctx);
case GROOVY_SCRIPT -> onGroovyEnrichItem(enrichItem, payload, replyTo, ctx);
}
@@ -157,7 +156,7 @@ private static Behavior onExecute(
}
private static void onGroovyEnrichItem(
- GetEnrichItemDTO enrichItem, JsonObject dataPayload, ActorRef replyTo,
+ EnrichItem enrichItem, JsonObject dataPayload, ActorRef replyTo,
ActorContext ctx) {
ActorRef responseActorRef =
@@ -178,7 +177,7 @@ private static void onGroovyEnrichItem(
}
private static void onHttpEnrichItem(
- GetEnrichItemDTO enrichItem, JsonObject dataPayload,
+ EnrichItem enrichItem, JsonObject dataPayload,
ActorRef replyTo, ActorRef httpSupervisor,
LocalDateTime expiredDate, ActorContext ctx) {
@@ -207,7 +206,7 @@ private static void onHttpEnrichItem(
}
- EnrichItem.EnrichItemType type = EnrichItem.EnrichItemType.valueOf(enrichItem.getType());
+ EnrichItem.EnrichItemType type = enrichItem.getType();
httpSupervisor.tell(
new HttpSupervisor.Call(
type == EnrichItem.EnrichItemType.HTTP_ASYNC,
diff --git a/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/enrichitem/HttpProcessor.java b/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/enrichitem/HttpProcessor.java
index 74523cb16..6e6f827d2 100644
--- a/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/enrichitem/HttpProcessor.java
+++ b/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/enrichitem/HttpProcessor.java
@@ -2,6 +2,7 @@
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
+import akka.actor.typed.RecipientRef;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
@@ -18,7 +19,7 @@ public class HttpProcessor extends AbstractBehavior {
public HttpProcessor(
ActorContext context,
- ActorRef tokenActorRef, boolean async) {
+ RecipientRef tokenActorRef, boolean async) {
super(context);
this.async = async;
this.tokenActorRef = tokenActorRef;
@@ -184,12 +185,12 @@ private Behavior waitGenerateToken(
.build();
}
- public static Behavior create(boolean async, ActorRef tokenActorRef) {
+ public static Behavior create(boolean async, RecipientRef tokenActorRef) {
return Behaviors.setup(param -> new HttpProcessor(param, tokenActorRef, async));
}
private final boolean async;
- private final ActorRef tokenActorRef;
+ private final RecipientRef tokenActorRef;
private final ActorRef tokenResponseAdapter;
public sealed interface Command {}
diff --git a/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/enrichitem/HttpSupervisor.java b/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/enrichitem/HttpSupervisor.java
index c4ba1c8c4..7ad3431a5 100644
--- a/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/enrichitem/HttpSupervisor.java
+++ b/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/enrichitem/HttpSupervisor.java
@@ -2,29 +2,33 @@
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
+import akka.actor.typed.RecipientRef;
import akka.actor.typed.SupervisorStrategy;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
-import akka.cluster.typed.ClusterSingleton;
-import akka.cluster.typed.SingletonActor;
+import akka.cluster.sharding.typed.javadsl.ClusterSharding;
+import io.openk9.datasource.pipeline.actor.Schedulation;
-import java.time.Duration;
import java.time.LocalDateTime;
public class HttpSupervisor extends AbstractBehavior {
- public HttpSupervisor(ActorContext context) {
+
+
+ public HttpSupervisor(
+ ActorContext context, Schedulation.SchedulationKey key) {
+
super(context);
- ClusterSingleton clusterSingleton =
- ClusterSingleton.get(context.getSystem());
-
- this.tokenActorRef = clusterSingleton.init(
- SingletonActor.of(
- Token.create(Duration.ofMinutes(15).toMillis()),
- "token")
- );
+ ClusterSharding clusterSharding = ClusterSharding.get(context.getSystem());
+ this.tokenActorRef = clusterSharding.entityRefFor(Token.ENTITY_TYPE_KEY, key.value());
+ }
+
+ public static Behavior create(Schedulation.SchedulationKey key) {
+ return Behaviors
+ .supervise(Behaviors.setup(ctx -> new HttpSupervisor(ctx, key)))
+ .onFailure(SupervisorStrategy.resume());
}
@Override
@@ -70,18 +74,13 @@ private Behavior onCall(Call call) {
}
- public static Behavior create() {
- return Behaviors
- .supervise(Behaviors.setup(HttpSupervisor::new))
- .onFailure(SupervisorStrategy.resume());
- }
-
private Behavior onCallback(Callback callback) {
tokenActorRef.tell(new Token.Callback(callback.tokenId, callback.jsonObject));
return Behaviors.same();
}
- private final ActorRef tokenActorRef;
+ private final RecipientRef tokenActorRef;
+
public sealed interface Command {}
public record Call(
boolean async, String url, byte[] jsonObject,
diff --git a/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/enrichitem/Token.java b/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/enrichitem/Token.java
index 9f56a23c5..53694c42f 100644
--- a/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/enrichitem/Token.java
+++ b/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/enrichitem/Token.java
@@ -4,8 +4,12 @@
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.SupervisorStrategy;
+import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
+import akka.actor.typed.javadsl.Receive;
+import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
+import io.openk9.datasource.pipeline.actor.Schedulation;
import io.openk9.datasource.util.CborSerializable;
import java.time.Duration;
@@ -14,7 +18,10 @@
import java.util.Map;
import java.util.UUID;
-public class Token {
+public class Token extends AbstractBehavior {
+
+ public static final EntityTypeKey ENTITY_TYPE_KEY =
+ EntityTypeKey.create(Token.Command.class, "tokenKey");
public sealed interface Command extends CborSerializable {}
public record Generate(LocalDateTime expiredDate, ActorRef replyTo) implements Command {}
@@ -25,59 +32,58 @@ public record TokenGenerated(String token) implements Response {}
public record TokenCallback(byte[] jsonObject) implements Response {}
public enum TokenState implements Response {EXPIRED, VALID}
+ public record SchedulationToken(String tenantId, String scheduleId, String token) {}
+
private record TokenInfo(
LocalDateTime createDate, LocalDateTime expiredDate, ActorRef replyTo) implements CborSerializable {}
- public static Behavior create() {
- return create(-1);
+ private final Cancellable cancellable;
+ private final Schedulation.SchedulationKey key;
+ private final Map tokens = new HashMap<>();
+
+ public Token(ActorContext context, Schedulation.SchedulationKey key) {
+ super(context);
+
+ this.key = key;
+ this.cancellable = getContext()
+ .getSystem()
+ .scheduler()
+ .scheduleAtFixedRate(
+ Duration.ZERO, Duration.ofMinutes(15),
+ () -> getContext().getSelf().tell(Tick.INSTANCE),
+ getContext().getExecutionContext());
}
- public static Behavior create(long validityTokenMillis) {
- return Behaviors.supervise(
- Behaviors
- .setup(ctx -> initial(new HashMap<>(), ctx, null)))
- .onFailure(SupervisorStrategy.resume());
+ @Override
+ public Receive createReceive() {
+ return newReceiveBuilder()
+ .onMessage(Generate.class, this::onGenerate)
+ .onMessage(Callback.class, this::onCallback)
+ .onMessage(Tick.class, tick -> onTick())
+ .build();
}
- private static Behavior initial(
- Map tokens,
- ActorContext ctx, Cancellable cancellable) {
-
- Cancellable newCancellable;
-
- if (cancellable == null) {
- newCancellable = ctx.scheduleOnce(
- Duration.ofMinutes(15), ctx.getSelf(), Tick.INSTANCE);
- }
- else {
- newCancellable = cancellable;
- }
+ public static Behavior create(Schedulation.SchedulationKey key) {
return Behaviors
- .receive(Command.class)
- .onMessage(Generate.class, generate -> onGenerate(
- generate.replyTo(), generate.expiredDate(), tokens, ctx, newCancellable))
- .onMessage(Callback.class, callback -> onCallback(
- callback.token(), callback.jsonObject, tokens, ctx, newCancellable))
- .onMessage(Tick.class, tick -> onTick(tokens, ctx, newCancellable))
- .build();
+ .supervise(Behaviors.setup(ctx -> new Token(ctx, key)))
+ .onFailure(SupervisorStrategy.resume());
}
- private static Behavior onCallback(
- String token, byte[] jsonObject,
- Map tokens, ActorContext ctx,
- Cancellable cancellable) {
+ private Behavior onCallback(Callback callback) {
+
+ String token = callback.token;
TokenInfo tokenInfo = tokens.get(token);
if (tokenInfo == null) {
- ctx.getLog().warn("Token not found: {}", token);
+ getContext().getLog().warn("Token not found: {}", token);
return Behaviors.same();
}
if (isValid(tokenInfo)) {
- ctx.getLog()
+ getContext().getLog()
.info(
"Token found: {}, elapsed: {} ms",
token, Duration.between(
@@ -85,63 +91,58 @@ private static Behavior onCallback(
LocalDateTime.now()
).toMillis());
- tokenInfo.replyTo.tell(new TokenCallback(jsonObject));
+ tokenInfo.replyTo.tell(new TokenCallback(callback.jsonObject));
}
else {
- Map newMap = new HashMap<>(tokens);
- newMap.remove(token, tokenInfo);
- ctx.getLog().warn("Token expired: {}", token);
+ tokens.remove(token, tokenInfo);
+ getContext().getLog().warn("Token expired: {}", token);
tokenInfo.replyTo.tell(TokenState.EXPIRED);
- return initial(newMap, ctx, cancellable);
}
return Behaviors.same();
}
- private static Behavior onTick(
- Map tokens,
- ActorContext ctx, Cancellable cancellable) {
+ private Behavior onTick() {
- ctx.getLog().info("Start token cleanup");
+ getContext().getLog().info("Start token cleanup");
if (tokens.isEmpty()) {
return Behaviors.same();
}
- Map newTokens = new HashMap<>();
+ int expiredCount = 0;
for (Map.Entry entry : tokens.entrySet()) {
TokenInfo value = entry.getValue();
if (isValid(value)) {
- newTokens.put(entry.getKey(), value);
+ tokens.put(entry.getKey(), value);
}
else {
- ctx.getLog().warn("Token expired: {}", entry.getKey());
+ getContext().getLog().warn("Token expired: {}", entry.getKey());
value.replyTo.tell(TokenState.EXPIRED);
+ expiredCount++;
}
}
- ctx.getLog().info("token expire count: {}", tokens.size() - newTokens.size());
-
- return initial(newTokens, ctx, cancellable);
+ getContext().getLog().info("token expire count: {}", expiredCount);
+ return Behaviors.same();
}
- private static Behavior onGenerate(
- ActorRef replyTo, LocalDateTime expiredDate,
- Map tokens, ActorContext ctx,
- Cancellable cancellable) {
+ private Behavior onGenerate(Generate generate) {
- String token = generateToken();
+ SchedulationToken schedulationToken = generateToken();
- Map newTokens = new HashMap<>(tokens);
+ ActorRef replyTo = generate.replyTo;
- newTokens.put(token, new TokenInfo(LocalDateTime.now(), expiredDate, replyTo));
+ tokens.put(
+ schedulationToken.token,
+ new TokenInfo(LocalDateTime.now(), generate.expiredDate, replyTo));
- replyTo.tell(new TokenGenerated(token));
+ replyTo.tell(new TokenGenerated(TokenUtils.encode(schedulationToken)));
- return initial(newTokens, ctx, cancellable);
+ return Behaviors.same();
}
private static boolean isValid(TokenInfo tokenInfo) {
@@ -152,8 +153,9 @@ private static boolean isExpired(TokenInfo tokenInfo) {
return tokenInfo.expiredDate().isBefore(LocalDateTime.now());
}
- private static String generateToken() {
- return UUID.randomUUID().toString();
+ private SchedulationToken generateToken() {
+ return new SchedulationToken(
+ key.tenantId(), key.scheduleId().getValue(), UUID.randomUUID().toString());
}
}
diff --git a/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/enrichitem/TokenUtils.java b/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/enrichitem/TokenUtils.java
new file mode 100644
index 000000000..4f1b52c41
--- /dev/null
+++ b/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/actor/enrichitem/TokenUtils.java
@@ -0,0 +1,20 @@
+package io.openk9.datasource.pipeline.actor.enrichitem;
+
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.json.Json;
+
+import java.util.Base64;
+
+public class TokenUtils {
+
+ public static String encode(Token.SchedulationToken token) {
+ String jsonToken = Json.encode(token);
+ return Base64.getEncoder().encodeToString(jsonToken.getBytes());
+ }
+
+ public static Token.SchedulationToken decode(String value) {
+ byte[] jsonToken = Base64.getDecoder().decode(value);
+ return Json.decodeValue(Buffer.buffer(jsonToken), Token.SchedulationToken.class);
+ }
+
+}
diff --git a/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/resource/PipelineResource.java b/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/resource/PipelineResource.java
index 55de186d9..783bde954 100644
--- a/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/resource/PipelineResource.java
+++ b/core/app/datasource/src/main/java/io/openk9/datasource/pipeline/resource/PipelineResource.java
@@ -1,7 +1,13 @@
package io.openk9.datasource.pipeline.resource;
+import akka.actor.typed.ActorSystem;
+import akka.cluster.sharding.typed.javadsl.ClusterSharding;
+import akka.cluster.sharding.typed.javadsl.EntityRef;
import io.openk9.auth.tenant.TenantResolver;
-import io.openk9.datasource.pipeline.actor.IngestionActorSystem;
+import io.openk9.datasource.actor.ActorSystemProvider;
+import io.openk9.datasource.pipeline.SchedulationKeyUtils;
+import io.openk9.datasource.pipeline.actor.enrichitem.Token;
+import io.openk9.datasource.pipeline.actor.enrichitem.TokenUtils;
import io.smallrye.mutiny.Uni;
import io.vertx.core.json.JsonObject;
@@ -19,7 +25,18 @@ public class PipelineResource {
public void callback(
@PathParam("token-id")String tokenId, JsonObject body) {
- actorSystem.callback(tokenId, body);
+ ActorSystem> actorSystem = actorSystemProvider.getActorSystem();
+
+ ClusterSharding clusterSharding = ClusterSharding.get(actorSystem);
+ Token.SchedulationToken schedulationToken = TokenUtils.decode(tokenId);
+
+ EntityRef tokenEntityRef = clusterSharding.entityRefFor(
+ Token.ENTITY_TYPE_KEY,
+ SchedulationKeyUtils.getValue(
+ schedulationToken.tenantId(), schedulationToken.scheduleId()));
+
+ tokenEntityRef.tell(
+ new Token.Callback(schedulationToken.token(), body.toBuffer().getBytes()));
}
@@ -30,14 +47,12 @@ public Uni callEnrichItem(
@PathParam("enrich-item-id") long enrichItemId,
JsonObject datasourcePayload) {
- return actorSystem.callEnrichItem(
- enrichItemId, tenantResolver.getTenantName(),
- datasourcePayload.getMap());
+ return Uni.createFrom().nothing();
}
@Inject
- IngestionActorSystem actorSystem;
+ ActorSystemProvider actorSystemProvider;
@Inject
TenantResolver tenantResolver;
diff --git a/core/app/datasource/src/main/java/io/openk9/datasource/processor/DatasourceProcessor.java b/core/app/datasource/src/main/java/io/openk9/datasource/processor/DatasourceProcessor.java
index 40a9aac51..f2b0ca7f4 100644
--- a/core/app/datasource/src/main/java/io/openk9/datasource/processor/DatasourceProcessor.java
+++ b/core/app/datasource/src/main/java/io/openk9/datasource/processor/DatasourceProcessor.java
@@ -18,8 +18,13 @@
package io.openk9.datasource.processor;
+import akka.actor.typed.ActorSystem;
+import akka.cluster.sharding.typed.javadsl.ClusterSharding;
+import akka.cluster.sharding.typed.javadsl.EntityRef;
+import io.openk9.datasource.actor.ActorSystemProvider;
import io.openk9.datasource.mapper.IngestionPayloadMapper;
-import io.openk9.datasource.pipeline.actor.IngestionActorSystem;
+import io.openk9.datasource.pipeline.SchedulationKeyUtils;
+import io.openk9.datasource.pipeline.actor.Schedulation;
import io.openk9.datasource.processor.payload.DataPayload;
import io.openk9.datasource.processor.payload.IngestionIndexWriterPayload;
import io.openk9.datasource.processor.payload.IngestionPayload;
@@ -30,6 +35,8 @@
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
+import java.time.Duration;
+import java.util.concurrent.CompletionStage;
@ApplicationScoped
public class DatasourceProcessor {
@@ -45,15 +52,39 @@ public Uni process(Message message) {
DataPayload dataPayload =
ingestionPayloadMapper.map(ingestionPayload);
- ingestionActorSystem.startEnrichPipeline(
- dataPayload, message);
+ ActorSystem> actorSystem = actorSystemProvider.getActorSystem();
+ ClusterSharding clusterSharding = ClusterSharding.get(actorSystem);
- return Uni.createFrom().voidItem();
+ String tenantId = dataPayload.getTenantId();
+ String scheduleId = dataPayload.getScheduleId();
+
+ EntityRef schedulationEntityRef = clusterSharding.entityRefFor(
+ Schedulation.ENTITY_TYPE_KEY, SchedulationKeyUtils.getValue(tenantId, scheduleId));
+
+ CompletionStage completionStage = schedulationEntityRef.ask(
+ replyTo -> new Schedulation.Ingest(dataPayload, replyTo), Duration.ofMinutes(10));
+
+ return Uni
+ .createFrom()
+ .completionStage(completionStage)
+ .onItemOrFailure()
+ .transform((response, throwable) -> {
+ if (throwable != null) {
+ return message.nack(throwable);
+ }
+ if (response instanceof Schedulation.Failure) {
+ return message.nack(new RuntimeException(((Schedulation.Failure) response).error()));
+ }
+ else {
+ return message.ack();
+ }
+ })
+ .flatMap(Uni.createFrom()::completionStage);
}
@Inject
- IngestionActorSystem ingestionActorSystem;
+ ActorSystemProvider actorSystemProvider;
@Inject
IngestionPayloadMapper ingestionPayloadMapper;