Skip to content

Commit

Permalink
Issue #495:
Browse files Browse the repository at this point in the history
 - implements sharding
 - refactor
  • Loading branch information
mrk-vi committed Jun 27, 2023
1 parent b2cb554 commit 1bb3c93
Show file tree
Hide file tree
Showing 18 changed files with 548 additions and 622 deletions.
4 changes: 4 additions & 0 deletions core/app/datasource/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster-typed_${scala.version}</artifactId>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster-sharding-typed_${scala.version}</artifactId>
</dependency>
<dependency>
<groupId>com.lightbend.akka.management</groupId>
<artifactId>akka-management_${scala.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
package io.openk9.datasource.actor;

import akka.cluster.sharding.typed.javadsl.ClusterSharding;
import akka.cluster.sharding.typed.javadsl.Entity;
import akka.cluster.typed.Cluster;
import akka.management.cluster.bootstrap.ClusterBootstrap;
import akka.management.javadsl.AkkaManagement;
import io.openk9.datasource.model.ScheduleId;
import io.openk9.datasource.pipeline.actor.Schedulation;
import io.openk9.datasource.pipeline.actor.enrichitem.Token;
import io.openk9.datasource.service.DatasourceService;
import io.openk9.datasource.sql.TransactionInvoker;
import io.quarkus.arc.Priority;
import io.quarkus.arc.properties.IfBuildProperty;
import org.jboss.logging.Logger;
Expand All @@ -11,6 +18,7 @@
import javax.enterprise.context.Dependent;
import javax.enterprise.inject.Produces;
import javax.inject.Inject;
import java.util.UUID;

@Dependent
public class ActorSystemConfig {
Expand Down Expand Up @@ -41,7 +49,41 @@ public ActorSystemInitializer cluster() {
};
}

@Produces
@ApplicationScoped
public ActorSystemInitializer clusterSharding() {
logger.info("init cluster sharding");
return actorSystem -> {
ClusterSharding clusterSharding = ClusterSharding.get(actorSystem);

clusterSharding.init(Entity.of(Schedulation.ENTITY_TYPE_KEY, entityCtx -> {
String entityId = entityCtx.getEntityId();
String[] strings = entityId.split("#");
Schedulation.SchedulationKey key =
new Schedulation.SchedulationKey(
strings[0],
new ScheduleId(UUID.fromString(strings[1])));
return Schedulation.create(key, transactionInvoker, datasourceService);
}));

clusterSharding.init(Entity.of(Token.ENTITY_TYPE_KEY, entityCtx -> {
String entityId = entityCtx.getEntityId();
String[] strings = entityId.split("#");
Schedulation.SchedulationKey key =
new Schedulation.SchedulationKey(
strings[0],
new ScheduleId(UUID.fromString(strings[1])));
return Token.create(key);
}));

};
}

@Inject
Logger logger;
@Inject
TransactionInvoker transactionInvoker;
@Inject
DatasourceService datasourceService;

}
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package io.openk9.datasource.actor;

import akka.actor.typed.ActorSystem;
import akka.cluster.typed.ClusterSingleton;
import akka.actor.typed.javadsl.Behaviors;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import io.openk9.datasource.pipeline.actor.IngestionActor;
import io.openk9.datasource.pipeline.actor.mapper.PipelineMapper;
import io.quarkus.runtime.Startup;
import org.eclipse.microprofile.config.inject.ConfigProperty;

Expand All @@ -25,7 +23,7 @@ void init() {
Config defaultConfig = ConfigFactory.load(clusterFile);
Config config = defaultConfig.withFallback(ConfigFactory.load());

actorSystem = ActorSystem.create(IngestionActor.create(pipelineMapper), "datasource", config);
actorSystem = ActorSystem.create(Behaviors.empty(), "datasource", config);

for (ActorSystemInitializer actorSystemInitializer : actorSystemInitializerInstance) {
actorSystemInitializer.init(actorSystem);
Expand All @@ -42,16 +40,10 @@ public ActorSystem<?> getActorSystem() {
return actorSystem;
}

public ClusterSingleton getClusterSingleton() {
return ClusterSingleton.get(actorSystem);
}

private ActorSystem<?> actorSystem;

@Inject
Instance<ActorSystemInitializer> actorSystemInitializerInstance;
@Inject
PipelineMapper pipelineMapper;
@ConfigProperty(name = "akka.cluster.file")
String clusterFile;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.List;
import java.util.UUID;

public class Scheduler {
public class JobTriggerer {

public sealed interface Command extends CborSerializable {}
public record ScheduleDatasource(String tenantName, long datasourceId, boolean schedulable, String cron) implements Command {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,23 @@
public class SchedulerInitializerActor {

public void scheduleDataSource(String tenantName, long datasourceId, boolean schedulable, String cron) {
getSchedulerRef().tell(new Scheduler.ScheduleDatasource(tenantName, datasourceId, schedulable, cron));
getSchedulerRef().tell(new JobTriggerer.ScheduleDatasource(tenantName, datasourceId, schedulable, cron));
}

public void unScheduleDataSource(String tenantName, long datasourceId) {
getSchedulerRef().tell(new Scheduler.UnScheduleDatasource(tenantName, datasourceId));
getSchedulerRef().tell(new JobTriggerer.UnScheduleDatasource(tenantName, datasourceId));
}

public void triggerDataSource(
String tenantName, long datasourceId, boolean startFromFirst) {
getSchedulerRef().tell(new Scheduler.TriggerDatasource(tenantName, datasourceId, startFromFirst));
getSchedulerRef().tell(new JobTriggerer.TriggerDatasource(tenantName, datasourceId, startFromFirst));
}

private ActorRef<Scheduler.Command> getSchedulerRef() {
private ActorRef<JobTriggerer.Command> getSchedulerRef() {
return ClusterSingleton.get(actorSystemProvider.getActorSystem())
.init(
SingletonActor.of(
Scheduler.create(
JobTriggerer.create(
httpPluginDriverClient, transactionInvoker
), "scheduler")
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.openk9.datasource.pipeline;

import io.openk9.datasource.model.ScheduleId;
import io.openk9.datasource.pipeline.actor.Schedulation;

import java.util.UUID;

public class SchedulationKeyUtils {

public static String getValue(Schedulation.SchedulationKey key) {
return getValue(key.tenantId(), key.scheduleId().getValue());
}

public static String getValue(String tenantId, String scheduleId) {
return tenantId + "#" + scheduleId;
}

public static Schedulation.SchedulationKey getKey(
String tenantId, String scheduleId) {

return new Schedulation.SchedulationKey(
tenantId, new ScheduleId(UUID.fromString(scheduleId)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
import akka.cluster.typed.ClusterSingleton;
import akka.cluster.typed.SingletonActor;
import io.openk9.common.util.collection.Collections;
import io.openk9.datasource.model.Datasource;
import io.openk9.datasource.model.EnrichItem;
import io.openk9.datasource.pipeline.actor.dto.GetDatasourceDTO;
import io.openk9.datasource.pipeline.actor.dto.GetEnrichItemDTO;
import io.openk9.datasource.pipeline.actor.dto.SchedulerDTO;
import io.openk9.datasource.model.EnrichPipelineItem;
import io.openk9.datasource.model.Scheduler;
import io.openk9.datasource.pipeline.actor.enrichitem.EnrichItemSupervisor;
import io.openk9.datasource.pipeline.actor.enrichitem.HttpSupervisor;
import io.openk9.datasource.processor.payload.DataPayload;
Expand All @@ -34,7 +34,7 @@ private record IndexWriterResponseWrapper(
IndexWriterActor.Response response) implements Command {}
private record EnrichItemSupervisorResponseWrapper(
EnrichItemSupervisor.Response response) implements Command {}
private record EnrichItemError(GetEnrichItemDTO enrichItem, Throwable exception) implements Command {}
private record EnrichItemError(EnrichItem enrichItem, Throwable exception) implements Command {}
private record InternalResponseWrapper(byte[] jsonObject) implements Command {}
private record InternalError(String error) implements Command {}
public sealed interface Response {
Expand All @@ -48,9 +48,9 @@ public record LastMessage(String scheduleId, String tenantId) implements Success
public record Failure(Throwable exception, String scheduleId, String tenantId) implements Response {}

public static Behavior<Command> create(
ActorRef<HttpSupervisor.Command> supervisorActorRef,
Schedulation.SchedulationKey key,
ActorRef<Response> replyTo, DataPayload dataPayload,
GetDatasourceDTO datasource, SchedulerDTO scheduler) {
Scheduler scheduler) {

return Behaviors.setup(ctx -> {

Expand All @@ -61,14 +61,20 @@ public static Behavior<Command> create(
IndexWriterActor.Response.class,
IndexWriterResponseWrapper::new);

Datasource datasource = scheduler.getDatasource();
io.openk9.datasource.model.EnrichPipeline enrichPipeline =
datasource.getEnrichPipeline();
log.info("start pipeline for datasource with id {}", datasource.getId());

ActorRef<HttpSupervisor.Command> supervisorActorRef =
ctx.spawnAnonymous(HttpSupervisor.create(key));

return Behaviors.receive(Command.class)
.onMessageEquals(
Start.INSTANCE, () ->
initPipeline(
ctx, supervisorActorRef, responseActorRef, replyTo, dataPayload,
scheduler, datasource.getEnrichItems())
scheduler, enrichPipeline.getEnrichPipelineItems())
)
.build();
});
Expand All @@ -79,11 +85,12 @@ private static Behavior<Command> initPipeline(
ActorRef<HttpSupervisor.Command> supervisorActorRef,
ActorRef<IndexWriterActor.Response> responseActorRef,
ActorRef<Response> replyTo, DataPayload dataPayload,
SchedulerDTO scheduler,
Set<GetEnrichItemDTO> enrichPipelineItems) {
Scheduler scheduler, Set<EnrichPipelineItem> enrichPipelineItems) {

Logger logger = ctx.getLog();

String scheduleId = scheduler.getScheduleId().getValue();

if (enrichPipelineItems.isEmpty()) {

logger.info("pipeline is empty, start index writer");
Expand Down Expand Up @@ -114,16 +121,16 @@ private static Behavior<Command> initPipeline(

if (response instanceof IndexWriterActor.Success) {
if (dataPayload.isLast()) {
replyTo.tell(new LastMessage(scheduler.getScheduleId(), dataPayload.getTenantId()));
replyTo.tell(new LastMessage(scheduleId, dataPayload.getTenantId()));
}
else {
replyTo.tell(new AnyMessage(scheduler.getScheduleId(), dataPayload.getTenantId()));
replyTo.tell(new AnyMessage(scheduleId, dataPayload.getTenantId()));
}
}
else if (response instanceof IndexWriterActor.Failure) {
replyTo.tell(new Failure(
((IndexWriterActor.Failure) response).exception(),
scheduler.getScheduleId(),
scheduleId,
dataPayload.getTenantId()));
}

Expand All @@ -133,15 +140,15 @@ else if (response instanceof IndexWriterActor.Failure) {

}

GetEnrichItemDTO enrichItem = Collections.head(enrichPipelineItems);
Set<GetEnrichItemDTO> tail = Collections.tail(enrichPipelineItems);
EnrichPipelineItem enrichPipelineItem = Collections.head(enrichPipelineItems);
EnrichItem enrichItem = enrichPipelineItem.getEnrichItem();
Set<EnrichPipelineItem> tail = Collections.tail(enrichPipelineItems);


logger.info("start enrich for enrichItem with id {}", enrichItem.getId());

String jsonPath = enrichItem.getJsonPath();
EnrichItem.BehaviorMergeType behaviorMergeType =
EnrichItem.BehaviorMergeType.valueOf(enrichItem.getBehaviorMergeType());
EnrichItem.BehaviorMergeType behaviorMergeType = enrichItem.getBehaviorMergeType();

ActorRef<EnrichItemSupervisor.Command> enrichItemSupervisorRef =
ctx.spawnAnonymous(EnrichItemSupervisor.create(supervisorActorRef));
Expand Down Expand Up @@ -177,10 +184,9 @@ else if (r instanceof EnrichItemSupervisor.Error) {
return Behaviors.receive(Command.class)
.onMessage(EnrichItemError.class, param -> {

GetEnrichItemDTO enrichItemError = param.enrichItem();
EnrichItem enrichItemError = param.enrichItem();

EnrichItem.BehaviorOnError behaviorOnError =
EnrichItem.BehaviorOnError.valueOf(enrichItem.getBehaviorOnError());
EnrichItem.BehaviorOnError behaviorOnError = enrichItemError.getBehaviorOnError();

switch (behaviorOnError) {
case SKIP -> {
Expand Down
Loading

0 comments on commit 1bb3c93

Please sign in to comment.