diff --git a/connector-runtime/connector-runtime-core/src/main/java/io/camunda/connector/runtime/core/error/RestartException.java b/connector-runtime/connector-runtime-core/src/main/java/io/camunda/connector/runtime/core/error/RestartException.java new file mode 100644 index 0000000000..e2d3d9f590 --- /dev/null +++ b/connector-runtime/connector-runtime-core/src/main/java/io/camunda/connector/runtime/core/error/RestartException.java @@ -0,0 +1,33 @@ +/* + * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH + * under one or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. Camunda licenses this file to you under the Apache License, + * Version 2.0; you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.camunda.connector.runtime.core.error; + +import java.time.Duration; + +public class RestartException extends RuntimeException { + + private final Duration backoffTime; + + public RestartException(String message, Throwable cause, Duration backoffTime) { + super(message, cause); + this.backoffTime = backoffTime; + } + + public Duration getBackoffTime() { + return backoffTime; + } +} diff --git a/connector-runtime/connector-runtime-core/src/main/java/io/camunda/connector/runtime/core/inbound/DefaultInboundConnectorContextFactory.java b/connector-runtime/connector-runtime-core/src/main/java/io/camunda/connector/runtime/core/inbound/DefaultInboundConnectorContextFactory.java index e1db3f77bf..9a4f0336ba 100644 --- a/connector-runtime/connector-runtime-core/src/main/java/io/camunda/connector/runtime/core/inbound/DefaultInboundConnectorContextFactory.java +++ b/connector-runtime/connector-runtime-core/src/main/java/io/camunda/connector/runtime/core/inbound/DefaultInboundConnectorContextFactory.java @@ -18,6 +18,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.EvictingQueue; +import io.camunda.connector.api.inbound.Activity; import io.camunda.connector.api.inbound.InboundConnectorContext; import io.camunda.connector.api.inbound.InboundConnectorExecutable; import io.camunda.connector.api.inbound.InboundIntermediateConnectorContext; @@ -28,6 +29,7 @@ import io.camunda.document.factory.DocumentFactory; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; +import java.time.Duration; import java.util.function.Consumer; public class DefaultInboundConnectorContextFactory implements InboundConnectorContextFactory { @@ -57,8 +59,9 @@ public DefaultInboundConnectorContextFactory( public > InboundConnectorContext createContext( final ValidInboundConnectorDetails connectorDetails, final Consumer cancellationCallback, + final Consumer reactivationCallback, final Class executableClass, - final EvictingQueue queue) { + final EvictingQueue queue) { InboundConnectorReportingContext inboundContext = new InboundConnectorContextImpl( @@ -68,6 +71,7 @@ public > InboundConnectorContext createC connectorDetails, correlationHandler, cancellationCallback, + reactivationCallback, objectMapper, queue); diff --git a/connector-runtime/connector-runtime-core/src/main/java/io/camunda/connector/runtime/core/inbound/InboundConnectorContextFactory.java b/connector-runtime/connector-runtime-core/src/main/java/io/camunda/connector/runtime/core/inbound/InboundConnectorContextFactory.java index f29c01184a..b9dcd413b8 100644 --- a/connector-runtime/connector-runtime-core/src/main/java/io/camunda/connector/runtime/core/inbound/InboundConnectorContextFactory.java +++ b/connector-runtime/connector-runtime-core/src/main/java/io/camunda/connector/runtime/core/inbound/InboundConnectorContextFactory.java @@ -17,9 +17,11 @@ package io.camunda.connector.runtime.core.inbound; import com.google.common.collect.EvictingQueue; +import io.camunda.connector.api.inbound.Activity; import io.camunda.connector.api.inbound.InboundConnectorContext; import io.camunda.connector.api.inbound.InboundConnectorExecutable; import io.camunda.connector.runtime.core.inbound.details.InboundConnectorDetails.ValidInboundConnectorDetails; +import java.time.Duration; import java.util.function.Consumer; /** @@ -50,6 +52,7 @@ public interface InboundConnectorContextFactory { > InboundConnectorContext createContext( final ValidInboundConnectorDetails connectorDetails, final Consumer cancellationCallback, + final Consumer reactivationCallback, final Class executableClass, - final EvictingQueue queue); + final EvictingQueue queue); } diff --git a/connector-runtime/connector-runtime-core/src/main/java/io/camunda/connector/runtime/core/inbound/InboundConnectorContextImpl.java b/connector-runtime/connector-runtime-core/src/main/java/io/camunda/connector/runtime/core/inbound/InboundConnectorContextImpl.java index 4720fb5ecd..f0c051763f 100644 --- a/connector-runtime/connector-runtime-core/src/main/java/io/camunda/connector/runtime/core/inbound/InboundConnectorContextImpl.java +++ b/connector-runtime/connector-runtime-core/src/main/java/io/camunda/connector/runtime/core/inbound/InboundConnectorContextImpl.java @@ -28,6 +28,7 @@ import io.camunda.connector.api.validation.ValidationProvider; import io.camunda.connector.feel.FeelEngineWrapperException; import io.camunda.connector.runtime.core.AbstractConnectorContext; +import io.camunda.connector.runtime.core.error.RestartException; import io.camunda.connector.runtime.core.inbound.correlation.InboundCorrelationHandler; import io.camunda.connector.runtime.core.inbound.details.InboundConnectorDetails; import io.camunda.connector.runtime.core.inbound.details.InboundConnectorDetails.ValidInboundConnectorDetails; @@ -36,6 +37,7 @@ import io.camunda.document.factory.DocumentFactoryImpl; import io.camunda.document.store.DocumentCreationRequest; import io.camunda.document.store.InMemoryDocumentStore; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Objects; @@ -56,6 +58,7 @@ public class InboundConnectorContextImpl extends AbstractConnectorContext private final ObjectMapper objectMapper; private final Consumer cancellationCallback; + private final Consumer reactivationCallback; private final EvictingQueue logs; private final DocumentFactory documentFactory; private Health health = Health.unknown(); @@ -68,8 +71,9 @@ public InboundConnectorContextImpl( ValidInboundConnectorDetails connectorDetails, InboundCorrelationHandler correlationHandler, Consumer cancellationCallback, + Consumer reactivationCallback, ObjectMapper objectMapper, - EvictingQueue logs) { + EvictingQueue logs) { super(secretProvider, validationProvider); this.documentFactory = documentFactory; this.correlationHandler = correlationHandler; @@ -79,6 +83,7 @@ public InboundConnectorContextImpl( connectorDetails.rawPropertiesWithoutKeywords()); this.objectMapper = objectMapper; this.cancellationCallback = cancellationCallback; + this.reactivationCallback = reactivationCallback; this.logs = logs; } @@ -88,6 +93,7 @@ public InboundConnectorContextImpl( ValidInboundConnectorDetails connectorDetails, InboundCorrelationHandler correlationHandler, Consumer cancellationCallback, + Consumer reactivationCallback, ObjectMapper objectMapper, EvictingQueue logs) { this( @@ -97,6 +103,7 @@ public InboundConnectorContextImpl( connectorDetails, correlationHandler, cancellationCallback, + reactivationCallback, objectMapper, logs); } @@ -125,6 +132,9 @@ public void cancel(Throwable exception) { } catch (Throwable e) { LOG.error("Failed to deliver the cancellation signal to the runtime", e); } + if (Objects.requireNonNull(exception) instanceof RestartException restartException) { + reactivationCallback.accept(restartException.getBackoffTime()); + } } @Override diff --git a/connector-runtime/connector-runtime-core/src/test/java/io/camunda/connector/runtime/core/inbound/DefaultInboundConnectorContextFactoryTest.java b/connector-runtime/connector-runtime-core/src/test/java/io/camunda/connector/runtime/core/inbound/DefaultInboundConnectorContextFactoryTest.java index dea238d025..c179b1e170 100644 --- a/connector-runtime/connector-runtime-core/src/test/java/io/camunda/connector/runtime/core/inbound/DefaultInboundConnectorContextFactoryTest.java +++ b/connector-runtime/connector-runtime-core/src/test/java/io/camunda/connector/runtime/core/inbound/DefaultInboundConnectorContextFactoryTest.java @@ -28,6 +28,7 @@ import io.camunda.connector.runtime.core.inbound.details.InboundConnectorDetails.ValidInboundConnectorDetails; import io.camunda.connector.runtime.core.secret.SecretProviderAggregator; import io.camunda.document.factory.DocumentFactory; +import java.time.Duration; import java.util.function.Consumer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -44,6 +45,7 @@ class DefaultInboundConnectorContextFactoryTest { @Mock private ValidationProvider validationProvider; @Mock private OperateClientAdapter operateClientAdapter; @Mock private Consumer cancellationCallback; + @Mock private Consumer reactivationCallback; @Mock private ValidInboundConnectorDetails newConnector; @Mock private DocumentFactory documentFactory; private DefaultInboundConnectorContextFactory factory; @@ -66,6 +68,7 @@ void shouldCreateInboundConnectorContext() { factory.createContext( newConnector, cancellationCallback, + reactivationCallback, ExecutableWithInboundContext.class, EvictingQueue.create(10)); @@ -78,6 +81,7 @@ void shouldCreateInboundConnectorContextWhenParameterizedTypeIsEmpty() { factory.createContext( newConnector, cancellationCallback, + reactivationCallback, ExecutableWithEmptyParameterizedType.class, EvictingQueue.create(10)); @@ -91,6 +95,7 @@ void shouldCreateInboundIntermediateConnectorContext() { factory.createContext( newConnector, cancellationCallback, + reactivationCallback, ExecutableWithIntermediate.class, EvictingQueue.create(10)); diff --git a/connector-runtime/connector-runtime-core/src/test/java/io/camunda/connector/runtime/core/inbound/InboundConnectorContextImplTest.java b/connector-runtime/connector-runtime-core/src/test/java/io/camunda/connector/runtime/core/inbound/InboundConnectorContextImplTest.java index d0b01f2dbe..36dd726e09 100644 --- a/connector-runtime/connector-runtime-core/src/test/java/io/camunda/connector/runtime/core/inbound/InboundConnectorContextImplTest.java +++ b/connector-runtime/connector-runtime-core/src/test/java/io/camunda/connector/runtime/core/inbound/InboundConnectorContextImplTest.java @@ -41,6 +41,21 @@ class InboundConnectorContextImplTest { private final SecretProvider secretProvider = new FooBarSecretProvider(); private final ObjectMapper mapper = ConnectorsObjectMapperSupplier.DEFAULT_MAPPER; + @NotNull + private static ValidInboundConnectorDetails getInboundConnectorDefinition( + Map properties) { + properties = new HashMap<>(properties); + properties.put("inbound.type", "io.camunda:connector:1"); + InboundConnectorElement element = + new InboundConnectorElement( + properties, + new StandaloneMessageCorrelationPoint("", "", null, null), + new ProcessElement("bool", 0, 0, "id", "")); + var details = InboundConnectorDetails.of(element.deduplicationId(List.of()), List.of(element)); + assertThat(details).isInstanceOf(ValidInboundConnectorDetails.class); + return (ValidInboundConnectorDetails) details; + } + @Test void bindProperties_shouldThrowExceptionWhenWrongFormat() { // given @@ -52,6 +67,7 @@ void bindProperties_shouldThrowExceptionWhenWrongFormat() { definition, null, (e) -> {}, + (d) -> {}, mapper, EvictingQueue.create(10)); // when and then @@ -73,6 +89,7 @@ void bindProperties_shouldParseNullValue() { definition, null, (e) -> {}, + (d) -> {}, mapper, EvictingQueue.create(10)); // when @@ -98,6 +115,7 @@ void bindProperties_shouldParseStringAsString() { definition, null, (e) -> {}, + (d) -> {}, mapper, EvictingQueue.create(10)); // when @@ -108,21 +126,6 @@ void bindProperties_shouldParseStringAsString() { .isInstanceOf(String.class); } - @NotNull - private static ValidInboundConnectorDetails getInboundConnectorDefinition( - Map properties) { - properties = new HashMap<>(properties); - properties.put("inbound.type", "io.camunda:connector:1"); - InboundConnectorElement element = - new InboundConnectorElement( - properties, - new StandaloneMessageCorrelationPoint("", "", null, null), - new ProcessElement("bool", 0, 0, "id", "")); - var details = InboundConnectorDetails.of(element.deduplicationId(List.of()), List.of(element)); - assertThat(details).isInstanceOf(ValidInboundConnectorDetails.class); - return (ValidInboundConnectorDetails) details; - } - @Test void bindProperties_shouldParseAllObject() { // Given @@ -156,6 +159,7 @@ void bindProperties_shouldParseAllObject() { definition, null, (e) -> {}, + (d) -> {}, mapper, EvictingQueue.create(10)); // when @@ -177,6 +181,7 @@ void getProperties_shouldNotParseFeel() { definition, null, (e) -> {}, + (d) -> {}, mapper, EvictingQueue.create(10)); @@ -265,8 +270,6 @@ public void setBool(final boolean bool) { this.bool = bool; } - public record InnerObject(List stringList, boolean bool) {} - @Override public boolean equals(final Object o) { if (this == o) return true; @@ -325,5 +328,7 @@ public String toString() { + bool + "}"; } + + public record InnerObject(List stringList, boolean bool) {} } } diff --git a/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/executable/BatchExecutableProcessor.java b/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/executable/BatchExecutableProcessor.java index c0f4e70e4b..61752c0863 100644 --- a/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/executable/BatchExecutableProcessor.java +++ b/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/executable/BatchExecutableProcessor.java @@ -18,6 +18,7 @@ import com.google.common.collect.EvictingQueue; import io.camunda.connector.api.inbound.Health; +import io.camunda.connector.api.inbound.InboundConnectorContext; import io.camunda.connector.api.inbound.InboundConnectorExecutable; import io.camunda.connector.api.inbound.webhook.WebhookConnectorExecutable; import io.camunda.connector.runtime.core.inbound.InboundConnectorContextFactory; @@ -33,14 +34,15 @@ import io.camunda.connector.runtime.inbound.webhook.WebhookConnectorRegistry; import io.camunda.connector.runtime.metrics.ConnectorMetrics.Inbound; import io.camunda.zeebe.spring.client.metrics.MetricsRecorder; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.UUID; -import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -49,15 +51,14 @@ public class BatchExecutableProcessor { private static final Logger LOG = LoggerFactory.getLogger(BatchExecutableProcessor.class); - - @Value("${camunda.connector.inbound.log.size:10}") - private int inboundLogsSize; - private final InboundConnectorFactory connectorFactory; private final InboundConnectorContextFactory connectorContextFactory; private final MetricsRecorder metricsRecorder; private final WebhookConnectorRegistry webhookConnectorRegistry; + @Value("${camunda.connector.inbound.log.size:10}") + private int inboundLogsSize; + public BatchExecutableProcessor( InboundConnectorFactory connectorFactory, InboundConnectorContextFactory connectorContextFactory, @@ -76,7 +77,8 @@ public BatchExecutableProcessor( */ public Map activateBatch( Map request, - BiConsumer cancellationCallback) { + Function> cancellationCallbackMaker, + Function> reactivationCallbackMaker) { final Map alreadyActivated = new HashMap<>(); @@ -94,7 +96,8 @@ public Map activateBatch( } final RegisteredExecutable result = - activateSingle(data, e -> cancellationCallback.accept(e, id)); + activateSingle( + data, cancellationCallbackMaker.apply(id), reactivationCallbackMaker.apply(id)); switch (result) { case Activated activated -> alreadyActivated.put(id, activated); @@ -138,14 +141,16 @@ public Map activateBatch( } private RegisteredExecutable activateSingle( - InboundConnectorDetails data, Consumer cancellationCallback) { + InboundConnectorDetails data, + Consumer cancellationCallback, + Consumer reactivationCallback) { if (data instanceof InvalidInboundConnectorDetails invalid) { return new InvalidDefinition(invalid, invalid.error().getMessage()); } var validData = (ValidInboundConnectorDetails) data; - final InboundConnectorExecutable executable; + final InboundConnectorExecutable executable; final InboundConnectorReportingContext context; try { @@ -155,6 +160,7 @@ private RegisteredExecutable activateSingle( connectorContextFactory.createContext( validData, cancellationCallback, + reactivationCallback, executable.getClass(), EvictingQueue.create(inboundLogsSize)); } catch (NoSuchElementException e) { diff --git a/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/executable/InboundExecutableRegistryImpl.java b/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/executable/InboundExecutableRegistryImpl.java index c6fa9f57d7..3765e5197c 100644 --- a/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/executable/InboundExecutableRegistryImpl.java +++ b/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/executable/InboundExecutableRegistryImpl.java @@ -28,18 +28,16 @@ import io.camunda.connector.runtime.inbound.executable.RegisteredExecutable.ConnectorNotRegistered; import io.camunda.connector.runtime.inbound.executable.RegisteredExecutable.FailedToActivate; import io.camunda.connector.runtime.inbound.executable.RegisteredExecutable.InvalidDefinition; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.function.BiConsumer; +import java.util.concurrent.*; +import java.util.function.Consumer; +import java.util.function.Function; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,18 +45,72 @@ public class InboundExecutableRegistryImpl implements InboundExecutableRegistry { + private static final Logger LOG = LoggerFactory.getLogger(InboundExecutableRegistryImpl.class); + final Map executables = new HashMap<>(); private final BlockingQueue eventQueue; + private final ScheduledExecutorService reactivationScheduler = + Executors.newSingleThreadScheduledExecutor(); private final ExecutorService executorService; - private final BatchExecutableProcessor batchExecutableProcessor; - private final Map executablesByElement = new ConcurrentHashMap<>(); - final Map executables = new HashMap<>(); - - private static final Logger LOG = LoggerFactory.getLogger(InboundExecutableRegistryImpl.class); - private final Map> deduplicationScopesByType; + private final Function> cancellationCallbackMaker = + id -> + (throwable) -> { + LOG.warn("Inbound connector executable has requested its cancellation", throwable); + var toCancel = executables.get(id); + if (toCancel == null) { + LOG.error("Inbound connector executable not found for the given ID: {}", id); + return; + } + if (toCancel instanceof Activated activated) { + activated.context().reportHealth(Health.down(throwable)); + try { + activated.executable().deactivate(); + } catch (Exception e) { + LOG.error("Failed to deactivate connector", e); + } + } else { + LOG.error( + "Attempted to cancel an inbound connector executable that is not in the active state: {}", + id); + } + }; + + private final Function> reactivationCallbackMaker = + (id) -> + (duration) -> { + LOG.warn( + "Inbound connector executable has requested its reactivation in {} {}", + duration.getSeconds(), + TimeUnit.SECONDS); + var toReactivate = executables.get(id); + if (toReactivate == null) { + LOG.error( + "Inbound connector executable not found for reactivation, received ID: {}", id); + return; + } + if (toReactivate instanceof Activated activated) { + reactivationScheduler.schedule( + () -> { + try { + activated.executable().activate(activated.context()); + activated.context().reportHealth(Health.up()); + } catch (Exception e) { + LOG.error("Failed to reactivate connector {}", id, e); + } + }, + duration.getSeconds(), + TimeUnit.SECONDS); + + } else { + LOG.error( + "Attempted to reactivate an inbound connector executable that has never been activated: {}", + id); + } + }; + public InboundExecutableRegistryImpl( InboundConnectorFactory connectorFactory, BatchExecutableProcessor batchExecutableProcessor) { this.batchExecutableProcessor = batchExecutableProcessor; @@ -92,28 +144,6 @@ public void publishEvent(InboundExecutableEvent event) { LOG.debug("Event added to the queue: {}", event); } - private final BiConsumer cancellationCallback = - (throwable, id) -> { - LOG.warn("Inbound connector executable has requested its cancellation", throwable); - var toCancel = executables.get(id); - if (toCancel == null) { - LOG.error("Inbound connector executable not found for the given ID: {}", id); - return; - } - if (toCancel instanceof Activated activated) { - activated.context().reportHealth(Health.down(throwable)); - try { - activated.executable().deactivate(); - } catch (Exception e) { - LOG.error("Failed to deactivate connector", e); - } - } else { - LOG.error( - "Attempted to cancel an inbound connector executable that is not in the active state: {}", - id); - } - }; - void handleEvent(InboundExecutableEvent event) { switch (event) { case InboundExecutableEvent.Activated activated -> handleActivated(activated); @@ -148,7 +178,8 @@ private void handleActivated(InboundExecutableEvent.Activated activated) { .forEach(element -> executablesByElement.put(element.element(), id))); var activationResult = - batchExecutableProcessor.activateBatch(groupedConnectors, cancellationCallback); + batchExecutableProcessor.activateBatch( + groupedConnectors, cancellationCallbackMaker, reactivationCallbackMaker); executables.putAll(activationResult); } catch (Exception e) { diff --git a/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/executable/RegisteredExecutable.java b/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/executable/RegisteredExecutable.java index 22b57b1e2b..32a34c8bd0 100644 --- a/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/executable/RegisteredExecutable.java +++ b/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/executable/RegisteredExecutable.java @@ -16,6 +16,7 @@ */ package io.camunda.connector.runtime.inbound.executable; +import io.camunda.connector.api.inbound.InboundConnectorContext; import io.camunda.connector.api.inbound.InboundConnectorExecutable; import io.camunda.connector.runtime.core.inbound.InboundConnectorReportingContext; import io.camunda.connector.runtime.core.inbound.details.InboundConnectorDetails; @@ -25,7 +26,8 @@ public sealed interface RegisteredExecutable { record Activated( - InboundConnectorExecutable executable, InboundConnectorReportingContext context) + InboundConnectorExecutable executable, + InboundConnectorReportingContext context) implements RegisteredExecutable {} record ConnectorNotRegistered(ValidInboundConnectorDetails data) diff --git a/connector-runtime/connector-runtime-spring/src/test/java/io/camunda/connector/runtime/inbound/executable/InboundExecutableRegistryTest.java b/connector-runtime/connector-runtime-spring/src/test/java/io/camunda/connector/runtime/inbound/executable/InboundExecutableRegistryTest.java index 6b0bc4b8b2..5ca1de7b38 100644 --- a/connector-runtime/connector-runtime-spring/src/test/java/io/camunda/connector/runtime/inbound/executable/InboundExecutableRegistryTest.java +++ b/connector-runtime/connector-runtime-spring/src/test/java/io/camunda/connector/runtime/inbound/executable/InboundExecutableRegistryTest.java @@ -123,7 +123,7 @@ public void validScenario_shouldActivateNormally() throws Exception { new ProcessElement("id", 0, 0, elementId, "tenant")); var executable = mock(InboundConnectorExecutable.class); var context = mock(InboundConnectorContextImpl.class); - when(contextFactory.createContext(any(), any(), any(), any())).thenReturn(context); + when(contextFactory.createContext(any(), any(), any(), any(), any())).thenReturn(context); when(factory.getInstance(any())).thenReturn(executable); @@ -148,7 +148,7 @@ public void activationFailure_shouldYieldFailedToActivate() throws Exception { doThrow(new RuntimeException("failed")).when(executable).activate(any()); var mockContext = mock(InboundConnectorContextImpl.class); - when(contextFactory.createContext(any(), any(), any(), any())).thenReturn(mockContext); + when(contextFactory.createContext(any(), any(), any(), any(), any())).thenReturn(mockContext); doThrow(new RuntimeException("failed")).when(executable).activate(any()); @@ -185,7 +185,7 @@ public void activationFailure_batch_shouldRollbackOtherConnectors() throws Excep when(mockContext.getDefinition()) .thenReturn(new InboundConnectorDefinition("type", "tenant", "id", null)); when(mockContext.getHealth()).thenReturn(Health.up()); - when(contextFactory.createContext(any(), any(), any(), any())).thenReturn(mockContext); + when(contextFactory.createContext(any(), any(), any(), any(), any())).thenReturn(mockContext); doThrow(new RuntimeException("failed")).when(executable2).activate(mockContext); @@ -232,7 +232,7 @@ public void connectorNotFound_batch_shouldNotRollbackOtherConnectors() throws Ex when(mockContext.getDefinition()) .thenReturn(new InboundConnectorDefinition("type", "tenant", "id", null)); when(mockContext.getHealth()).thenReturn(Health.up()); - when(contextFactory.createContext(any(), any(), any(), any())).thenReturn(mockContext); + when(contextFactory.createContext(any(), any(), any(), any(), any())).thenReturn(mockContext); // when registry.handleEvent(new Activated("tenant", 0, List.of(element1, element2))); diff --git a/connector-runtime/spring-boot-starter-camunda-connectors/src/test/java/io/camunda/connector/runtime/inbound/WebhookControllerPlainJavaTests.java b/connector-runtime/spring-boot-starter-camunda-connectors/src/test/java/io/camunda/connector/runtime/inbound/WebhookControllerPlainJavaTests.java index 03d2b6fbb4..e8708ca473 100644 --- a/connector-runtime/spring-boot-starter-camunda-connectors/src/test/java/io/camunda/connector/runtime/inbound/WebhookControllerPlainJavaTests.java +++ b/connector-runtime/spring-boot-starter-camunda-connectors/src/test/java/io/camunda/connector/runtime/inbound/WebhookControllerPlainJavaTests.java @@ -56,6 +56,54 @@ public class WebhookControllerPlainJavaTests { private static final ObjectMapper mapper = ConnectorsObjectMapperSupplier.DEFAULT_MAPPER; + private static long nextProcessDefinitionKey = 0L; + + public static RegisteredExecutable.Activated buildConnector( + ValidInboundConnectorDetails connectorData) { + WebhookConnectorExecutable executable = mock(WebhookConnectorExecutable.class); + try { + Mockito.when(executable.triggerWebhook(any(WebhookProcessingPayload.class))) + .thenReturn(mock(WebhookResult.class)); + } catch (Exception e) { + throw new RuntimeException(e); + } + return new RegisteredExecutable.Activated(executable, buildContext(connectorData)); + } + + public static InboundConnectorContextImpl buildContext(ValidInboundConnectorDetails def) { + var context = + new InboundConnectorContextImpl( + name -> null, + new DefaultValidationProvider(), + def, + mock(InboundCorrelationHandler.class), + e -> {}, + (d) -> {}, + mapper, + EvictingQueue.create(10)); + + return spy(context); + } + + public static ValidInboundConnectorDetails webhookDefinition( + String bpmnProcessId, int version, String path) { + var details = + InboundConnectorDetails.of( + bpmnProcessId + version + path, + List.of(webhookElement(++nextProcessDefinitionKey, bpmnProcessId, version, path))); + assertThat(details).isInstanceOf(ValidInboundConnectorDetails.class); + return (ValidInboundConnectorDetails) details; + } + + public static InboundConnectorElement webhookElement( + long processDefinitionKey, String bpmnProcessId, int version, String path) { + + return new InboundConnectorElement( + Map.of("inbound.type", "io.camunda:webhook:1", "inbound.context", path), + new StartEventCorrelationPoint(bpmnProcessId, version, processDefinitionKey), + new ProcessElement( + bpmnProcessId, version, processDefinitionKey, "testElement", "testTenantId")); + } @Test public void multipleWebhooksOnSameContextPathAreNotSupported() { @@ -136,52 +184,4 @@ public void webhookDeactivation_samePathButDifferentConnector_shouldFail() { assertFalse(webhook.isRegistered(processA2)); assertTrue(webhook.isRegistered(processA1)); } - - private static long nextProcessDefinitionKey = 0L; - - public static RegisteredExecutable.Activated buildConnector( - ValidInboundConnectorDetails connectorData) { - WebhookConnectorExecutable executable = mock(WebhookConnectorExecutable.class); - try { - Mockito.when(executable.triggerWebhook(any(WebhookProcessingPayload.class))) - .thenReturn(mock(WebhookResult.class)); - } catch (Exception e) { - throw new RuntimeException(e); - } - return new RegisteredExecutable.Activated(executable, buildContext(connectorData)); - } - - public static InboundConnectorContextImpl buildContext(ValidInboundConnectorDetails def) { - var context = - new InboundConnectorContextImpl( - name -> null, - new DefaultValidationProvider(), - def, - mock(InboundCorrelationHandler.class), - e -> {}, - mapper, - EvictingQueue.create(10)); - - return spy(context); - } - - public static ValidInboundConnectorDetails webhookDefinition( - String bpmnProcessId, int version, String path) { - var details = - InboundConnectorDetails.of( - bpmnProcessId + version + path, - List.of(webhookElement(++nextProcessDefinitionKey, bpmnProcessId, version, path))); - assertThat(details).isInstanceOf(ValidInboundConnectorDetails.class); - return (ValidInboundConnectorDetails) details; - } - - public static InboundConnectorElement webhookElement( - long processDefinitionKey, String bpmnProcessId, int version, String path) { - - return new InboundConnectorElement( - Map.of("inbound.type", "io.camunda:webhook:1", "inbound.context", path), - new StartEventCorrelationPoint(bpmnProcessId, version, processDefinitionKey), - new ProcessElement( - bpmnProcessId, version, processDefinitionKey, "testElement", "testTenantId")); - } } diff --git a/connector-runtime/spring-boot-starter-camunda-connectors/src/test/java/io/camunda/connector/runtime/inbound/WebhookControllerTestZeebeTests.java b/connector-runtime/spring-boot-starter-camunda-connectors/src/test/java/io/camunda/connector/runtime/inbound/WebhookControllerTestZeebeTests.java index 09bb8a652a..19098992e6 100644 --- a/connector-runtime/spring-boot-starter-camunda-connectors/src/test/java/io/camunda/connector/runtime/inbound/WebhookControllerTestZeebeTests.java +++ b/connector-runtime/spring-boot-starter-camunda-connectors/src/test/java/io/camunda/connector/runtime/inbound/WebhookControllerTestZeebeTests.java @@ -106,6 +106,7 @@ public void testSuccessfulProcessingWithActivation() throws Exception { webhookDef, correlationHandler, (e) -> {}, + (d) -> {}, mapper, EvictingQueue.create(10)); @@ -147,6 +148,7 @@ public void testSuccessfulProcessingWithActivationAndStrictResponse() throws Exc webhookDef, correlationHandler, (e) -> {}, + (d) -> {}, mapper, EvictingQueue.create(10)); @@ -194,6 +196,7 @@ public void testSuccessfulProcessingWithFailedActivation_NoConsumeUnmatched() th webhookDef, correlationHandlerMock, (e) -> {}, + (d) -> {}, mapper, EvictingQueue.create(10)); @@ -235,6 +238,7 @@ public void testSuccessfulProcessingWithFailedActivation_ConsumeUnmatched() thro webhookDef, correlationHandlerMock, (e) -> {}, + (d) -> {}, mapper, EvictingQueue.create(10)); @@ -284,6 +288,7 @@ public void testSuccessfulProcessingWithDuplicateMessage() throws Exception { webhookDef, correlationHandlerMock, (e) -> {}, + (d) -> {}, mapper, EvictingQueue.create(10)); @@ -322,6 +327,7 @@ public void testSuccessfulProcessingWithActivationCorrelationHidden() throws Exc webhookDef, correlationHandler, (e) -> {}, + (d) -> {}, mapper, EvictingQueue.create(10)); @@ -360,6 +366,7 @@ public void testSuccessfulProcessingWithErrorDuringActivation() throws Exception webhookDefinition, correlationHandler, (e) -> {}, + (d) -> {}, mapper, EvictingQueue.create(10)); @@ -393,6 +400,7 @@ public void testErrorDuringProcessing() throws Exception { webhookDef, correlationHandler, (e) -> {}, + (d) -> {}, mapper, EvictingQueue.create(10)); @@ -428,6 +436,7 @@ public void testFeelExpressionErrorDuringProcessing() throws Exception { webhookDef, correlationHandler, (e) -> {}, + (d) -> {}, mapper, EvictingQueue.create(10)); @@ -451,8 +460,6 @@ public void testFeelExpressionErrorDuringProcessing() throws Exception { assertEquals("expression", responseEntity.getBody().expression()); } - interface MyVerifiableWebhook extends WebhookConnectorExecutable {} - @Test @SuppressWarnings("unchecked") public void testSuccessfulProcessingWithVerification() throws Exception { @@ -475,6 +482,7 @@ public void testSuccessfulProcessingWithVerification() throws Exception { webhookDef, correlationHandler, (e) -> {}, + (d) -> {}, mapper, EvictingQueue.create(10)); @@ -533,6 +541,7 @@ public void testSuccessfulProcessingWithResponseBodyExpression() throws Exceptio webhookDef, correlationHandlerMock, (e) -> {}, + (d) -> {}, mapper, EvictingQueue.create(10)); @@ -565,4 +574,6 @@ public void deployProcess(String bpmnProcessId) { .send() .join(); } + + interface MyVerifiableWebhook extends WebhookConnectorExecutable {} } diff --git a/connectors/http/polling/src/test/java/io/camunda/connector/http/polling/model/PollingIntervalConfigurationTest.java b/connectors/http/polling/src/test/java/io/camunda/connector/http/polling/model/PollingIntervalConfigurationTest.java index 204692204a..8950408283 100644 --- a/connectors/http/polling/src/test/java/io/camunda/connector/http/polling/model/PollingIntervalConfigurationTest.java +++ b/connectors/http/polling/src/test/java/io/camunda/connector/http/polling/model/PollingIntervalConfigurationTest.java @@ -46,6 +46,7 @@ public void setUp() { connectorData, null, (e) -> {}, + (d) -> {}, ConnectorsObjectMapperSupplier.getCopy(), EvictingQueue.create(10)); }