Skip to content

Commit

Permalink
refactor(core): use interface HasUID when possible
Browse files Browse the repository at this point in the history
This commit adds a new HasUID interface for explicitly
managing classes that expose a unique identifier
  • Loading branch information
fhussonnois committed Oct 3, 2024
1 parent f93013a commit 0d17a81
Show file tree
Hide file tree
Showing 29 changed files with 132 additions and 99 deletions.
21 changes: 21 additions & 0 deletions core/src/main/java/io/kestra/core/models/HasUID.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.kestra.core.models;

/**
* A UID is used to uniquely identify an entity across an entire Kestra's cluster.
* <p>
* A UID is either a unique random ID or composed of a combination of entity properties such as
* the associated tenant, namespace, and identifier.
* <p>
* For Kestra's queuing mechanism the UID can be used as routing/or partitioning key.
*/
public interface HasUID {

/**
* Gets the UID attached to this entity.
* <p>
* Be careful when modifying the implementation of this method for subclasses, as it should be consistent over time.
*
* @return the string uid.
*/
String uid();
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.TenantInterface;
import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand Down Expand Up @@ -35,11 +36,9 @@
@JsonSubTypes.Type(value = ExecutionKilledExecution.class, name = "execution"),
@JsonSubTypes.Type(value = ExecutionKilledTrigger.class, name = "trigger"),
})
abstract public class ExecutionKilled implements TenantInterface {
abstract public class ExecutionKilled implements TenantInterface, HasUID {
abstract public String getType();

abstract public String uid();

public enum State {
REQUESTED,
EXECUTED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ public boolean isEqual(TriggerContext triggerContext) {
triggerContext.getTriggerId().equals(this.triggerId);
}


@Override
public String uid() {
return IdUtils.fromParts(
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/java/io/kestra/core/models/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.fasterxml.jackson.databind.introspect.AnnotatedMember;
import com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.Label;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.Execution;
Expand Down Expand Up @@ -49,7 +50,7 @@
@ToString
@EqualsAndHashCode
@FlowValidation
public class Flow extends AbstractFlow {
public class Flow extends AbstractFlow implements HasUID {
private static final ObjectMapper NON_DEFAULT_OBJECT_MAPPER = JacksonMapper.ofYaml()
.copy()
.setSerializationInclusion(JsonInclude.Include.NON_DEFAULT);
Expand Down Expand Up @@ -121,6 +122,9 @@ public Logger logger() {
return LoggerFactory.getLogger("flow." + this.id);
}


/** {@inheritDoc **/
@Override
@JsonIgnore
public String uid() {
return Flow.uid(this.getTenantId(), this.getNamespace(), this.getId(), Optional.ofNullable(this.revision));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package io.kestra.core.models.namespaces;

import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.HasUID;

public interface NamespaceInterface extends DeletedInterface {
public interface NamespaceInterface extends DeletedInterface, HasUID {
String getId();


/** {@inheritDoc **/
@Override
default String uid() {
return this.getId();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.fasterxml.jackson.databind.introspect.AnnotatedMember;
import com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector;
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.models.validations.ManualConstraintViolation;
Expand All @@ -33,7 +34,7 @@
@Introspected
@ToString
@EqualsAndHashCode
public class Template implements DeletedInterface, TenantInterface {
public class Template implements DeletedInterface, TenantInterface, HasUID {
private static final ObjectMapper YAML_MAPPER = JacksonMapper.ofYaml().copy()
.setAnnotationIntrospector(new JacksonAnnotationIntrospector() {
@Override
Expand Down Expand Up @@ -70,6 +71,9 @@ public boolean hasIgnoreMarker(final AnnotatedMember m) {
@Builder.Default
private final boolean deleted = false;


/** {@inheritDoc **/
@Override
@JsonIgnore
public String uid() {
return Template.uid(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package io.kestra.core.models.topologies;

import com.fasterxml.jackson.annotation.JsonIgnore;
import io.kestra.core.models.HasUID;
import lombok.Builder;
import lombok.Value;

import jakarta.validation.constraints.NotNull;

@Value
@Builder
public class FlowTopology {
public class FlowTopology implements HasUID {
@NotNull
FlowNode source;

Expand All @@ -18,6 +19,9 @@ public class FlowTopology {
@NotNull
FlowNode destination;


/** {@inheritDoc **/
@Override
@JsonIgnore
public String uid() {
// we use destination as prefix to enable prefixScan on FlowTopologyUpdateTransformer
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.core.models.triggers;

import io.kestra.core.models.HasUID;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
Expand All @@ -19,7 +20,7 @@
@EqualsAndHashCode(callSuper = true)
@Getter
@NoArgsConstructor
public class Trigger extends TriggerContext {
public class Trigger extends TriggerContext implements HasUID {
@Nullable
private String executionId;

Expand Down Expand Up @@ -48,6 +49,9 @@ protected Trigger(TriggerBuilder<?, ?> b) {
return new TriggerBuilderImpl();
}


/** {@inheritDoc **/
@Override
public String uid() {
return uid(this);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kestra.core.models.triggers.multipleflows;

import com.fasterxml.jackson.annotation.JsonIgnore;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.utils.IdUtils;
import lombok.Builder;
Expand All @@ -13,7 +14,7 @@

@Value
@Builder
public class MultipleConditionWindow {
public class MultipleConditionWindow implements HasUID {
String tenantId;

String namespace;
Expand All @@ -28,6 +29,9 @@ public class MultipleConditionWindow {

Map<String, Boolean> results;


/** {@inheritDoc **/
@Override
@JsonIgnore
public String uid() {
return IdUtils.fromParts(
Expand Down
43 changes: 5 additions & 38 deletions core/src/main/java/io/kestra/core/queues/QueueService.java
Original file line number Diff line number Diff line change
@@ -1,67 +1,34 @@
package io.kestra.core.queues;

import io.kestra.core.models.HasUID;
import io.kestra.core.models.Setting;
import io.kestra.core.models.executions.*;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.templates.Template;
import io.kestra.core.models.topologies.FlowTopology;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow;
import io.kestra.core.runners.*;
import io.kestra.core.server.ServiceInstance;
import jakarta.inject.Singleton;

@Singleton
public class QueueService {
public String key(Object object) {
if (object.getClass() == Execution.class) {
if (object instanceof HasUID hasUID) {
return hasUID.uid();
} else if (object.getClass() == Execution.class) {
return ((Execution) object).getId();
} else if (object.getClass() == WorkerTask.class) {
return ((WorkerTask) object).getTaskRun().getId();
} else if (object.getClass() == WorkerTaskRunning.class) {
return ((WorkerTaskRunning) object).getTaskRun().getId();
} else if (object.getClass() == WorkerInstance.class) {
return ((WorkerInstance) object).getWorkerUuid();
} else if (object.getClass() == WorkerTaskResult.class) {
return ((WorkerTaskResult) object).getTaskRun().getId();
} else if (object.getClass() == LogEntry.class) {
return null;
} else if (object.getClass() == Flow.class) {
return ((Flow) object).uid();
} else if (object.getClass() == Template.class) {
return ((Template) object).uid();
} else if (object instanceof ExecutionKilled) {
return ((ExecutionKilled) object).uid();
} else if (object.getClass() == Trigger.class) {
return ((Trigger) object).uid();
} else if (object.getClass() == MultipleConditionWindow.class) {
return ((MultipleConditionWindow) object).uid();
} else if (object.getClass() == SubflowExecution.class) {
return ((SubflowExecution<?>) object).getExecution().getId();
} else if (object.getClass() == SubflowExecutionResult.class) {
return ((SubflowExecutionResult) object).getExecutionId();
} else if (object.getClass() == ExecutionDelay.class) {
return ((ExecutionDelay) object).uid();
} else if (object.getClass() == ExecutorState.class) {
return ((ExecutorState) object).getExecutionId();
} else if (object.getClass() == Setting.class) {
return ((Setting) object).getKey();
} else if (object.getClass() == Executor.class) {
return ((Executor) object).getExecution().getId();
} else if (object.getClass() == FlowTopology.class) {
return ((FlowTopology) object).uid();
return ((Executor) object).getExecution().getId();
} else if (object.getClass() == MetricEntry.class) {
return null;
} else if (object.getClass() == WorkerTrigger.class) {
return ((WorkerTrigger) object).getTriggerContext().uid();
} else if (object.getClass() == WorkerTriggerRunning.class) {
return ((WorkerTriggerRunning) object).getTriggerContext().uid();
} else if (object.getClass() == WorkerTriggerResult.class) {
return ((WorkerTriggerResult) object).getTriggerContext().uid();
} else if (object.getClass() == ExecutionQueued.class) {
return ((ExecutionQueued) object).uid();
} else if (object.getClass() == ServiceInstance.class) {
return ((ServiceInstance) object).id();
} else {
throw new IllegalArgumentException("Unknown type '" + object.getClass().getName() + "'");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;

/**
Expand Down Expand Up @@ -113,7 +112,7 @@ default ServiceStateTransition.Response mayTransitionServiceTo(final ServiceInst
final String reason) {
// This default method is not transactional and may lead to inconsistent state transition.
synchronized (this) {
Optional<ServiceInstance> optional = findById(instance.id());
Optional<ServiceInstance> optional = findById(instance.uid());
final ImmutablePair<ServiceInstance, ServiceInstance> beforeAndAfter;
// UNKNOWN service
if (optional.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kestra.core.runners;

import com.fasterxml.jackson.annotation.JsonIgnore;
import io.kestra.core.models.HasUID;
import io.kestra.core.models.flows.State;
import jakarta.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
Expand All @@ -12,7 +13,7 @@
@Value
@AllArgsConstructor
@Builder
public class ExecutionDelay {
public class ExecutionDelay implements HasUID {
@NotNull
String taskRunId;

Expand All @@ -26,6 +27,7 @@ public class ExecutionDelay {

@NotNull DelayType delayType;

@Override
@JsonIgnore
public String uid() {
return String.join("_", executionId, taskRunId);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.core.runners;

import io.kestra.core.models.HasUID;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.utils.IdUtils;
import lombok.AllArgsConstructor;
Expand All @@ -12,7 +13,7 @@
@Value
@AllArgsConstructor
@Builder
public class ExecutionQueued {
public class ExecutionQueued implements HasUID {
String tenantId;

@NotNull
Expand All @@ -37,6 +38,8 @@ public static ExecutionQueued fromExecutionRunning(ExecutionRunning executionRun
);
}

/** {@inheritDoc **/
@Override
public String uid() {
return IdUtils.fromParts(this.tenantId, this.namespace, this.flowId, this.execution.getId());
}
Expand Down
6 changes: 2 additions & 4 deletions core/src/main/java/io/kestra/core/runners/WorkerJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.kestra.core.models.HasUID;

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible = true, include = JsonTypeInfo.As.EXISTING_PROPERTY, defaultImpl = WorkerTask.class)
@JsonSubTypes({
@JsonSubTypes.Type(value = WorkerTask.class, name = "task"),
@JsonSubTypes.Type(value = WorkerTrigger.class, name = "trigger")
})
public abstract class WorkerJob {
public abstract class WorkerJob implements HasUID {
abstract public String getType();

abstract public String uid();

abstract public String taskRunId();
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.kestra.core.models.HasUID;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
Expand All @@ -16,7 +17,7 @@
@Data
@SuperBuilder
@NoArgsConstructor
public abstract class WorkerJobRunning {
public abstract class WorkerJobRunning implements HasUID {
@NotNull
private WorkerInstance workerInstance;

Expand All @@ -25,5 +26,4 @@ public abstract class WorkerJobRunning {

abstract public String getType();

abstract public String uid();
}
8 changes: 3 additions & 5 deletions core/src/main/java/io/kestra/core/runners/WorkerTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,11 @@ public Logger logger() {
);
}

/**
* {@inheritDoc}
*/
@Override
public String uid() {
return this.taskRun.getTaskId();
}

@Override
public String taskRunId() {
return this.taskRun.getId();
}

Expand Down
Loading

0 comments on commit 0d17a81

Please sign in to comment.