Skip to content
This repository has been archived by the owner on Nov 27, 2023. It is now read-only.

Commit

Permalink
fix: Aggregate EIP also contains steps
Browse files Browse the repository at this point in the history
Fixes #808
  • Loading branch information
Delawen committed Aug 3, 2023
1 parent 656ab1f commit 1ad630b
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 34 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ For more information and all configuration properties, see [Quarkus HTTP Referen
### Steps catalog resources

**Actual versions of resources:**
* Kaoto camel components: **94ef243f574ad42b85dafe59b3d36858c47fcd38**
* Kaoto camel components: **3ee2af43623923a5c5e09df6f3f70657e1ccd09f**
* Kaoto view definitions: **94aae37dee4356d51ac34bfb757eb43a85ad2c0a**
* Camel-connectors: **3.21.0**
* Camel-connectors: **3.21.0**
Expand Down
Binary file modified api/src/main/resources/camel-component-metadata.zip
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ spec:
simple: "${header.StockSymbol}"
aggregation-strategy: myAggregatorStrategy
completion-size: 2
steps:
- log:
id: log-2fdd
message: "${body}"
- load-balance:
weighted:
distribution-ratio: "2,1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,9 +513,6 @@ private FlowStep processStep(final Step step, final boolean to) {
flowStep = getCamelConnector(step, to);
} else if ("EIP".equalsIgnoreCase(step.getKind())) {
switch (step.getName()) {
case "aggregate":
flowStep = new AggregateFlowStep(step);
break;
case "bean":
flowStep = new BeanFlowStep(step);
break;
Expand Down Expand Up @@ -624,6 +621,9 @@ private FlowStep processStep(final Step step, final boolean to) {
}
} else if ("EIP-BRANCH".equalsIgnoreCase(step.getKind())) {
switch (step.getName()) {
case "aggregate":
flowStep = new AggregateFlowStep(step, this);
break;
case "circuit-breaker":
flowStep = new CircuitBreakerFlowStep(step, this);
break;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
package io.kaoto.backend.camel.model.deployment.kamelet.step;

import com.fasterxml.jackson.annotation.JsonAlias;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import io.kaoto.backend.api.metadata.catalog.StepCatalog;
import io.kaoto.backend.camel.KamelHelper;
import io.kaoto.backend.camel.KamelPopulator;
import io.kaoto.backend.camel.model.deployment.kamelet.FlowStep;
import io.kaoto.backend.camel.model.deployment.kamelet.expression.Expression;
import io.kaoto.backend.camel.service.step.parser.kamelet.KameletStepParserService;
import io.kaoto.backend.model.parameter.Parameter;
import io.kaoto.backend.model.step.Step;

import java.util.List;
import java.util.Map;


Expand Down Expand Up @@ -72,97 +78,136 @@ public class Aggregate extends EIPStep {
public static final String COMPLETE_ALL_ON_STOP1 = "complete-all-on-stop";
public static final String DESCRIPTION_LABEL = KamelHelper.DESCRIPTION;

@JsonProperty("correlation-expression")
@JsonProperty(CORRELATION_EXPRESSION)
@JsonAlias(CORRELATION_EXPRESSION1)
private Expression correlationExpression;

@JsonProperty("completion-predicate")
@JsonProperty(COMPLETION_PREDICATE)
@JsonAlias(COMPLETION_PREDICATE1)
private String completionPredicate;

@JsonProperty("completion-timeout-expression")
@JsonProperty(COMPLETION_TIMEOUT_EXPRESSION)
@JsonAlias(COMPLETION_TIMEOUT_EXPRESSION1)
private Expression completionTimeoutExpression;

@JsonProperty("completion-size-expression")
@JsonProperty(COMPLETION_SIZE_EXPRESSION)
@JsonAlias(COMPLETION_SIZE_EXPRESSION1)
private String completionSizeExpression;

@JsonProperty("optimistic-lock-retry-policy")
@JsonProperty(OPTIMISTIC_LOCK_RETRY_POLICY)
@JsonAlias(OPTIMISTIC_LOCK_RETRY_POLICY1)
private String optimisticLockRetryPolicy;

@JsonProperty("parallel-processing")
@JsonProperty(PARALLEL_PROCESSING)
@JsonAlias(PARALLEL_PROCESSING1)
private Boolean parallelProcessing;

@JsonProperty("optimistic-locking")
@JsonProperty(OPTIMISTIC_LOCKING)
@JsonAlias(OPTIMISTIC_LOCKING1)
private String optimisticLocking;

@JsonProperty("executor-service")
@JsonProperty(EXECUTOR_SERVICE)
@JsonAlias(EXECUTOR_SERVICE1)
private String executorService;

@JsonProperty("timeout-checker-executor-service")
@JsonProperty(TIMEOUT_CHECKER_EXECUTOR_SERVICE)
@JsonAlias(TIMEOUT_CHECKER_EXECUTOR_SERVICE1)
private String timeoutCheckerExecutorService;

@JsonProperty("aggregate-controller")
@JsonProperty(AGGREGATE_CONTROLLER)
@JsonAlias(AGGREGATE_CONTROLLER1)
private String aggregateController;

@JsonProperty("aggregation-repository")
@JsonProperty(AGGREGATION_REPOSITORY)
@JsonAlias(AGGREGATION_REPOSITORY1)
private String aggregationRepository;

@JsonProperty("aggregation-strategy")
@JsonProperty(AGGREGATION_STRATEGY)
@JsonAlias(AGGREGATION_STRATEGY1)
private String aggregationStrategy;

@JsonProperty("aggregation-strategy-method-name")
@JsonProperty(AGGREGATION_STRATEGY_METHOD_NAME)
@JsonAlias(AGGREGATION_STRATEGY_METHOD_NAME1)
private String aggregationStrategyMethodName;

@JsonProperty("aggregation-strategy-method-allow-null")
@JsonProperty(AGGREGATION_STRATEGY_METHOD_ALLOW_NULL)
@JsonAlias(AGGREGATION_STRATEGY_METHOD_ALLOW_NULL1)
private Boolean aggregationStrategyMethodAllowNull;

@JsonProperty("completion-size")
@JsonProperty(COMPLETION_SIZE)
@JsonAlias(COMPLETION_SIZE1)
private Integer completionSize;

@JsonProperty("completion-interval")
@JsonProperty(COMPLETION_INTERVAL)
@JsonAlias(COMPLETION_INTERVAL1)
private String completionInterval;

@JsonProperty("completion-timeout")
@JsonProperty(COMPLETION_TIMEOUT)
@JsonAlias(COMPLETION_TIMEOUT1)
private String completionTimeout;

@JsonProperty("completion-timeout-checker-interval")
@JsonProperty(COMPLETION_TIMEOUT_CHECKER_INTERVAL)
@JsonAlias(COMPLETION_TIMEOUT_CHECKER_INTERVAL1)
private String completionTimeoutCheckerInterval;

@JsonProperty("completion-from-batch-consumer")
@JsonProperty(COMPLETION_FROM_BATCH_CONSUMER)
@JsonAlias(COMPLETION_FROM_BATCH_CONSUMER1)
private Boolean completionFromBatchConsumer;

@JsonProperty("completion-on-new-correlation-group")
@JsonProperty(COMPLETION_ON_NEW_CORRELATION_GROUP)
@JsonAlias(COMPLETION_ON_NEW_CORRELATION_GROUP1)
private Boolean completionOnNewCorrelationGroup;

@JsonProperty("eager-check-completion")
@JsonProperty(EAGER_CHECK_COMPLETION)
@JsonAlias(EAGER_CHECK_COMPLETION1)
private Boolean eagerCheckCompletion;

@JsonProperty("ignore-invalid-correlation-keys")
@JsonProperty(IGNORE_INVALID_CORRELATION_KEYS)
@JsonAlias(IGNORE_INVALID_CORRELATION_KEYS1)
private Boolean ignoreInvalidCorrelationKeys;

@JsonProperty("close-correlation-key-on-completion")
@JsonProperty(CLOSE_CORRELATION_KEY_ON_COMPLETION)
@JsonAlias(CLOSE_CORRELATION_KEY_ON_COMPLETION1)
private Integer closeCorrelationKeyOnCompletion;

@JsonProperty("discard-on-completion-timeout")
@JsonProperty(DISCARD_ON_COMPLETION_TIMEOUT)
@JsonAlias(DISCARD_ON_COMPLETION_TIMEOUT1)
private Boolean discardOnCompletionTimeout;

@JsonProperty("discard-on-aggregation-failure")
@JsonProperty(DISCARD_ON_AGGREGATION_FAILURE)
@JsonAlias(DISCARD_ON_AGGREGATION_FAILURE1)
private Boolean discardOnAggregationFailure;

@JsonProperty("force-completion-on-stop")
@JsonProperty(FORCE_COMPLETION_ON_STOP)
@JsonAlias(FORCE_COMPLETION_ON_STOP1)
private Boolean forceCompletionOnStop;

@JsonProperty("complete-all-on-stop")
@JsonProperty(COMPLETE_ALL_ON_STOP)
@JsonAlias(COMPLETE_ALL_ON_STOP1)
private Boolean completeAllOnStop;

@JsonProperty(KamelHelper.DESCRIPTION)
private String description;

@JsonProperty(KamelHelper.STEPS)
private List<FlowStep> steps;

public Aggregate() {
}

public Aggregate(Step step) {
public Aggregate(Step step, final KamelPopulator kameletPopulator) {
super(step);

if (step.getBranches() != null && !step.getBranches().isEmpty()) {
setSteps(kameletPopulator.processSteps(step.getBranches().get(0)));
}
}

@Override
protected void processBranches(final Step step, final StepCatalog catalog,
final KameletStepParserService kameletStepParserService) {
step.setBranches(List.of(createBranch(KamelHelper.STEPS, this.getSteps(), kameletStepParserService)));
}

public Map<String, Object> getRepresenterProperties() {
Map<String, Object> properties = super.getDefaultRepresenterProperties();
Expand Down Expand Up @@ -250,6 +295,9 @@ public Map<String, Object> getRepresenterProperties() {
if (this.description != null) {
properties.put(DESCRIPTION_LABEL, this.description);
}
if (this.getSteps() != null) {
properties.put(KamelHelper.STEPS, this.getSteps());
}

return properties;
}
Expand Down Expand Up @@ -495,6 +543,8 @@ protected void assignProperty(final Parameter parameter) {
}
}



public Expression getCorrelationExpression() {
return correlationExpression;
}
Expand Down Expand Up @@ -718,4 +768,12 @@ public String getDescription() {
public void setDescription(final String description) {
this.description = description;
}

public List<FlowStep> getSteps() {
return steps;
}

public void setSteps(List<FlowStep> steps) {
this.steps = steps;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import io.kaoto.backend.api.metadata.catalog.StepCatalog;
import io.kaoto.backend.camel.KamelPopulator;
import io.kaoto.backend.camel.model.deployment.kamelet.FlowStep;
import io.kaoto.backend.camel.service.step.parser.kamelet.KameletStepParserService;
import io.kaoto.backend.model.step.Step;
Expand All @@ -32,8 +33,8 @@ public class AggregateFlowStep implements FlowStep {
public AggregateFlowStep() {
}

public AggregateFlowStep(Step step) {
setAggregate(new Aggregate(step));
public AggregateFlowStep(final Step step, final KamelPopulator kameletPopulator) {
setAggregate(new Aggregate(step, kameletPopulator));
}

@Override
Expand Down
Binary file modified camel-support/src/test/resources/camel-component-metadata.zip
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ spec:
simple: "${header.StockSymbol}"
aggregation-strategy: myAggregatorStrategy
completion-size: 2
steps:
- log:
id: log-2fdd
message: "${body}"
- service-call:
name: sc
static-service-discovery:
Expand Down
Empty file modified update-resources.sh
100644 → 100755
Empty file.

0 comments on commit 1ad630b

Please sign in to comment.