Skip to content

Commit

Permalink
feat(core): Added namespaces property to the Count task (#5295)
Browse files Browse the repository at this point in the history
  • Loading branch information
Skraye authored Oct 3, 2024
1 parent 216a083 commit f93013a
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ public class ExecutionCount {
@NotNull
String namespace;

@NotNull
String flowId;

@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,11 @@ class FlowFilter {

List<ExecutionCount> executionCounts(
@Nullable String tenantId,
List<Flow> flows,
@Nullable List<Flow> flows,
@Nullable List<State.Type> states,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate
);
@Nullable ZonedDateTime endDate,
@Nullable List<String> namespaces);

Execution save(Execution execution);

Expand Down
26 changes: 18 additions & 8 deletions core/src/main/java/io/kestra/plugin/core/execution/Count.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@
import io.kestra.core.runners.RunContext;
import io.kestra.core.services.FlowService;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.slf4j.Logger;

import java.time.ZonedDateTime;
import java.util.List;
import java.util.stream.Collectors;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;

import static io.kestra.core.utils.Rethrow.throwPredicate;

Expand Down Expand Up @@ -76,10 +74,9 @@
aliases = "io.kestra.core.tasks.executions.Counts"
)
public class Count extends Task implements RunnableTask<Count.Output> {
@NotNull
@NotEmpty
@Schema(
title = "A list of flows to be filtered."
title = "A list of flows to be filtered.",
description = "If not provided, namespaces must be set."
)
@PluginProperty
protected List<Flow> flows;
Expand Down Expand Up @@ -114,25 +111,38 @@ public class Count extends Task implements RunnableTask<Count.Output> {
@PluginProperty(dynamic = true)
protected String expression;

@PluginProperty
protected List<String> namespaces;

@Override
public Output run(RunContext runContext) throws Exception {
Logger logger = runContext.logger();
ExecutionRepositoryInterface executionRepository = ((DefaultRunContext)runContext)
.getApplicationContext()
.getBean(ExecutionRepositoryInterface.class);

if (flows == null && namespaces == null) {
throw new IllegalArgumentException("You must provide a list of flows or namespaces");
}

var flowInfo = runContext.flowInfo();

// check that all flows are allowed
FlowService flowService = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowService.class);
flows.forEach(flow -> flowService.checkAllowedNamespace(flowInfo.tenantId(), flow.getNamespace(), flowInfo.tenantId(), flowInfo.namespace()));
if (flows != null) {
flows.forEach(flow -> flowService.checkAllowedNamespace(flowInfo.tenantId(), flow.getNamespace(), flowInfo.tenantId(), flowInfo.namespace()));
}
if (namespaces != null) {
namespaces.forEach(namespace -> flowService.checkAllowedNamespace(flowInfo.tenantId(), namespace, flowInfo.tenantId(), flowInfo.namespace()));
}

List<ExecutionCount> executionCounts = executionRepository.executionCounts(
flowInfo.tenantId(),
flows,
this.states,
startDate != null ? ZonedDateTime.parse(runContext.render(startDate)) : null,
endDate != null ? ZonedDateTime.parse(runContext.render(endDate)) : null
endDate != null ? ZonedDateTime.parse(runContext.render(endDate)) : null,
namespaces
);

logger.trace("{} flows matching filters", executionCounts.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,8 @@ protected void executionsCount() throws InterruptedException {
),
null,
ZonedDateTime.now().minusDays(10),
ZonedDateTime.now()
ZonedDateTime.now(),
null
);
assertThat(result.size(), is(4));
assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("first")).findFirst().get().getCount(), is(2L));
Expand All @@ -608,12 +609,24 @@ protected void executionsCount() throws InterruptedException {
),
List.of(State.Type.SUCCESS),
null,
null,
null
);
assertThat(result.size(), is(3));
assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("first")).findFirst().get().getCount(), is(2L));
assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("second")).findFirst().get().getCount(), is(3L));
assertThat(result.stream().filter(executionCount -> executionCount.getFlowId().equals("third")).findFirst().get().getCount(), is(9L));

result = executionRepository.executionCounts(
null,
null,
null,
null,
null,
List.of(NAMESPACE)
);
assertThat(result.size(), is(1));
assertThat(result.stream().filter(executionCount -> executionCount.getNamespace().equals(NAMESPACE)).findFirst().get().getCount(), is(14L));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.kestra.core.runners.Executor;
import io.kestra.core.runners.ExecutorState;
import io.kestra.core.utils.DateUtils;
import io.kestra.core.utils.ListUtils;
import io.kestra.core.utils.NamespaceUtils;
import io.kestra.jdbc.runner.AbstractJdbcExecutorStateStorage;
import io.kestra.jdbc.runner.JdbcIndexerInterface;
Expand Down Expand Up @@ -99,7 +100,9 @@ public Boolean isTaskRunEnabled() {
return false;
}

/** {@inheritDoc} **/
/**
* {@inheritDoc}
**/
@Override
public Flux<Execution> findAllByTriggerExecutionId(String tenantId,
String triggerExecutionId) {
Expand Down Expand Up @@ -804,8 +807,8 @@ public List<ExecutionCount> executionCounts(
List<Flow> flows,
@Nullable List<State.Type> states,
@Nullable ZonedDateTime startDate,
@Nullable ZonedDateTime endDate
) {
@Nullable ZonedDateTime endDate,
@Nullable List<String> namespaces) {
ZonedDateTime finalStartDate = startDate == null ? ZonedDateTime.now().minusDays(30) : startDate;
ZonedDateTime finalEndDate = endDate == null ? ZonedDateTime.now() : endDate;

Expand Down Expand Up @@ -835,16 +838,24 @@ public List<ExecutionCount> executionCounts(
select = select.and(this.statesFilter(states));
}

// add flow & namespace filters
select = select.and(DSL.or(
flows
List<Condition> orConditions = new ArrayList<>();
orConditions.addAll(ListUtils.emptyOnNull(flows)
.stream()
.map(flow -> DSL.and(
field("namespace").eq(flow.getNamespace()),
field("flow_id").eq(flow.getFlowId())
))
.toList());

orConditions.addAll(
ListUtils.emptyOnNull(namespaces)
.stream()
.map(flow -> DSL.and(
field("namespace").eq(flow.getNamespace()),
field("flow_id").eq(flow.getFlowId())
))
.map(np -> field("namespace").eq(np))
.toList()
));
);

// add flows filters
select = select.and(DSL.or(orConditions));

// map result to flow
return select
Expand All @@ -865,22 +876,40 @@ public List<ExecutionCount> executionCounts(
.toList();
});

List<ExecutionCount> counts = new ArrayList<>();
// fill missing with count at 0
return flows
.stream()
.map(flow -> result
if (flows != null) {
counts.addAll(flows
.stream()
.filter(executionCount -> executionCount.getNamespace().equals(flow.getNamespace()) &&
executionCount.getFlowId().equals(flow.getFlowId())
.map(flow -> result
.stream()
.filter(executionCount -> executionCount.getNamespace().equals(flow.getNamespace()) &&
executionCount.getFlowId().equals(flow.getFlowId())
)
.findFirst()
.orElse(new ExecutionCount(
flow.getNamespace(),
flow.getFlowId(),
0L
))
)
.findFirst()
.orElse(new ExecutionCount(
flow.getNamespace(),
flow.getFlowId(),
0L
))
)
.toList();
.toList());
}

if (namespaces != null) {
Map<String, Long> groupedByNamespace = result.stream()
.collect(Collectors.groupingBy(
ExecutionCount::getNamespace,
Collectors.summingLong(ExecutionCount::getCount)
));

counts.addAll(groupedByNamespace.entrySet()
.stream()
.map(entry -> new ExecutionCount(entry.getKey(), null, entry.getValue()))
.toList());
}

return counts;
}

public List<Execution> lastExecutions(
Expand Down
1 change: 1 addition & 0 deletions jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ private void executionQueue(Either<Execution, DeserializationException> either)
List.of(new io.kestra.core.models.executions.statistics.Flow(flow.getNamespace(), flow.getId())),
List.of(State.Type.RUNNING, State.Type.PAUSED),
null,
null,
null
).getFirst();

Expand Down

0 comments on commit f93013a

Please sign in to comment.