diff --git a/connector-runtime/connector-runtime-core/src/main/java/io/camunda/connector/runtime/core/inbound/DefaultInboundConnectorContextFactory.java b/connector-runtime/connector-runtime-core/src/main/java/io/camunda/connector/runtime/core/inbound/DefaultInboundConnectorContextFactory.java index e1db3f77bf..21c7c31f07 100644 --- a/connector-runtime/connector-runtime-core/src/main/java/io/camunda/connector/runtime/core/inbound/DefaultInboundConnectorContextFactory.java +++ b/connector-runtime/connector-runtime-core/src/main/java/io/camunda/connector/runtime/core/inbound/DefaultInboundConnectorContextFactory.java @@ -18,6 +18,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.EvictingQueue; +import io.camunda.connector.api.inbound.Activity; import io.camunda.connector.api.inbound.InboundConnectorContext; import io.camunda.connector.api.inbound.InboundConnectorExecutable; import io.camunda.connector.api.inbound.InboundIntermediateConnectorContext; @@ -58,7 +59,7 @@ public > InboundConnectorContext createC final ValidInboundConnectorDetails connectorDetails, final Consumer cancellationCallback, final Class executableClass, - final EvictingQueue queue) { + final EvictingQueue queue) { InboundConnectorReportingContext inboundContext = new InboundConnectorContextImpl( diff --git a/connector-runtime/connector-runtime-core/src/main/java/io/camunda/connector/runtime/core/inbound/InboundConnectorContextFactory.java b/connector-runtime/connector-runtime-core/src/main/java/io/camunda/connector/runtime/core/inbound/InboundConnectorContextFactory.java index f29c01184a..0321ee7458 100644 --- a/connector-runtime/connector-runtime-core/src/main/java/io/camunda/connector/runtime/core/inbound/InboundConnectorContextFactory.java +++ b/connector-runtime/connector-runtime-core/src/main/java/io/camunda/connector/runtime/core/inbound/InboundConnectorContextFactory.java @@ -17,6 +17,7 @@ package io.camunda.connector.runtime.core.inbound; import com.google.common.collect.EvictingQueue; +import io.camunda.connector.api.inbound.Activity; import io.camunda.connector.api.inbound.InboundConnectorContext; import io.camunda.connector.api.inbound.InboundConnectorExecutable; import io.camunda.connector.runtime.core.inbound.details.InboundConnectorDetails.ValidInboundConnectorDetails; @@ -51,5 +52,5 @@ > InboundConnectorContext createContext( final ValidInboundConnectorDetails connectorDetails, final Consumer cancellationCallback, final Class executableClass, - final EvictingQueue queue); + final EvictingQueue queue); } diff --git a/connector-runtime/connector-runtime-core/src/main/java/io/camunda/connector/runtime/core/inbound/InboundConnectorContextImpl.java b/connector-runtime/connector-runtime-core/src/main/java/io/camunda/connector/runtime/core/inbound/InboundConnectorContextImpl.java index 4720fb5ecd..4af34c1814 100644 --- a/connector-runtime/connector-runtime-core/src/main/java/io/camunda/connector/runtime/core/inbound/InboundConnectorContextImpl.java +++ b/connector-runtime/connector-runtime-core/src/main/java/io/camunda/connector/runtime/core/inbound/InboundConnectorContextImpl.java @@ -69,7 +69,7 @@ public InboundConnectorContextImpl( InboundCorrelationHandler correlationHandler, Consumer cancellationCallback, ObjectMapper objectMapper, - EvictingQueue logs) { + EvictingQueue logs) { super(secretProvider, validationProvider); this.documentFactory = documentFactory; this.correlationHandler = correlationHandler; @@ -89,7 +89,7 @@ public InboundConnectorContextImpl( InboundCorrelationHandler correlationHandler, Consumer cancellationCallback, ObjectMapper objectMapper, - EvictingQueue logs) { + EvictingQueue logs) { this( secretProvider, validationProvider, diff --git a/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/executable/BatchExecutableProcessor.java b/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/executable/BatchExecutableProcessor.java index c0f4e70e4b..22feb78ab3 100644 --- a/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/executable/BatchExecutableProcessor.java +++ b/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/executable/BatchExecutableProcessor.java @@ -18,6 +18,7 @@ import com.google.common.collect.EvictingQueue; import io.camunda.connector.api.inbound.Health; +import io.camunda.connector.api.inbound.InboundConnectorContext; import io.camunda.connector.api.inbound.InboundConnectorExecutable; import io.camunda.connector.api.inbound.webhook.WebhookConnectorExecutable; import io.camunda.connector.runtime.core.inbound.InboundConnectorContextFactory; @@ -39,7 +40,6 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.UUID; -import java.util.function.BiConsumer; import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,8 +75,7 @@ public BatchExecutableProcessor( * considered valid). */ public Map activateBatch( - Map request, - BiConsumer cancellationCallback) { + Map request, CancellationManager cancellationManager) { final Map alreadyActivated = new HashMap<>(); @@ -94,12 +93,13 @@ public Map activateBatch( } final RegisteredExecutable result = - activateSingle(data, e -> cancellationCallback.accept(e, id)); + activateSingle(data, cancellationManager.createCancellationCallback(id)); switch (result) { case Activated activated -> alreadyActivated.put(id, activated); case ConnectorNotRegistered notRegistered -> alreadyActivated.put(id, notRegistered); case InvalidDefinition invalid -> alreadyActivated.put(id, invalid); + case RegisteredExecutable.Cancelled cancelled -> alreadyActivated.put(id, cancelled); case FailedToActivate failed -> { LOG.error( "Failed to activate connector of type '{}' with deduplication ID '{}', reason: {}. " @@ -145,7 +145,7 @@ private RegisteredExecutable activateSingle( } var validData = (ValidInboundConnectorDetails) data; - final InboundConnectorExecutable executable; + final InboundConnectorExecutable executable; final InboundConnectorReportingContext context; try { diff --git a/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/executable/CancellationManager.java b/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/executable/CancellationManager.java new file mode 100644 index 0000000000..a60ea67d23 --- /dev/null +++ b/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/executable/CancellationManager.java @@ -0,0 +1,130 @@ +/* + * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH + * under one or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. Camunda licenses this file to you under the Apache License, + * Version 2.0; you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.camunda.connector.runtime.inbound.executable; + +import io.camunda.connector.api.error.ConnectorRetryException; +import io.camunda.connector.api.inbound.Health; +import java.time.Duration; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Handles exceptions thrown during the cancellation process of an {@link + * io.camunda.connector.api.inbound.InboundConnectorContext#cancel(Throwable) cancel} operation. + * This class determines the appropriate action to take based on the specific exception encountered. + */ +public class CancellationManager { + + private static final Logger LOG = LoggerFactory.getLogger(CancellationManager.class); + private final Map executables; + private final ScheduledExecutorService reactivationExecutor; + + public CancellationManager(Map executables) { + this.executables = executables; + this.reactivationExecutor = Executors.newSingleThreadScheduledExecutor(); + } + + public static CancellationManager create(Map executables) { + return new CancellationManager(executables); + } + + public Consumer createCancellationCallback(UUID uuid) { + return (throwable) -> { + switch (throwable) { + case ConnectorRetryException connectorRetryException -> { + handleCancellation(uuid, throwable); + handleRestart(uuid, connectorRetryException); + } + default -> handleCancellation(uuid, throwable); + } + }; + } + + private void handleRestart(UUID uuid, ConnectorRetryException connectorRetryException) { + LOG.warn( + "Inbound connector executable has requested its reactivation in {} {}", + connectorRetryException.getBackoffDuration().getSeconds(), + TimeUnit.SECONDS); + var toReactivate = executables.get(uuid); + if (toReactivate == null) { + LOG.error("Inbound connector executable not found for reactivation, received ID: {}", uuid); + return; + } + this.reactivationExecutor.schedule( + () -> + tryRestart( + uuid, + toReactivate, + connectorRetryException.getBackoffDuration(), + connectorRetryException.getRetries()), + connectorRetryException.getBackoffDuration().getSeconds(), + TimeUnit.SECONDS); + } + + private void tryRestart( + UUID uuid, RegisteredExecutable toReactivate, Duration delay, Integer retryAttempts) { + if (toReactivate instanceof RegisteredExecutable.Cancelled cancelled) { + try { + cancelled.executable().activate(cancelled.context()); + executables.replace( + uuid, new RegisteredExecutable.Activated(cancelled.executable(), cancelled.context())); + LOG.info("Activation successful for ID: {}", uuid); + } catch (Exception e) { + if (retryAttempts == 0) return; + LOG.error( + "Activation failed for ID: {}. Retrying... {} retries left", uuid, retryAttempts - 1); + this.reactivationExecutor.schedule( + () -> tryRestart(uuid, toReactivate, delay, retryAttempts - 1), + delay.getSeconds(), + TimeUnit.SECONDS); + } + } else { + LOG.error("Executable is not in a cancelled state. Cannot reactivate, ID: {}", uuid); + } + } + + private void handleCancellation(UUID uuid, Throwable throwable) { + LOG.warn("Inbound connector executable has requested its cancellation"); + var toCancel = executables.get(uuid); + if (toCancel == null) { + LOG.error("Inbound connector executable not found for the given ID: {}", uuid); + return; + } + if (toCancel instanceof RegisteredExecutable.Activated activated) { + activated.context().reportHealth(Health.down(throwable)); + try { + activated.executable().deactivate(); + executables.replace( + uuid, + new RegisteredExecutable.Cancelled( + activated.executable(), activated.context(), throwable)); + } catch (Exception e) { + LOG.error("Failed to deactivate connector", e); + } + } else { + LOG.error( + "Attempted to cancel an inbound connector executable that is not in the active state: {}", + uuid); + } + } +} diff --git a/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/executable/InboundExecutableRegistryImpl.java b/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/executable/InboundExecutableRegistryImpl.java index c6fa9f57d7..3bac1f9099 100644 --- a/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/executable/InboundExecutableRegistryImpl.java +++ b/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/executable/InboundExecutableRegistryImpl.java @@ -25,6 +25,7 @@ import io.camunda.connector.runtime.core.inbound.details.InboundConnectorDetails; import io.camunda.connector.runtime.inbound.executable.InboundExecutableEvent.Deactivated; import io.camunda.connector.runtime.inbound.executable.RegisteredExecutable.Activated; +import io.camunda.connector.runtime.inbound.executable.RegisteredExecutable.Cancelled; import io.camunda.connector.runtime.inbound.executable.RegisteredExecutable.ConnectorNotRegistered; import io.camunda.connector.runtime.inbound.executable.RegisteredExecutable.FailedToActivate; import io.camunda.connector.runtime.inbound.executable.RegisteredExecutable.InvalidDefinition; @@ -34,12 +35,7 @@ import java.util.Map; import java.util.Optional; import java.util.UUID; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.function.BiConsumer; +import java.util.concurrent.*; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,21 +43,19 @@ public class InboundExecutableRegistryImpl implements InboundExecutableRegistry { + private static final Logger LOG = LoggerFactory.getLogger(InboundExecutableRegistryImpl.class); + final Map executables = new ConcurrentHashMap<>(); private final BlockingQueue eventQueue; private final ExecutorService executorService; - private final BatchExecutableProcessor batchExecutableProcessor; - + private final CancellationManager cancellationManager; private final Map executablesByElement = new ConcurrentHashMap<>(); - final Map executables = new HashMap<>(); - - private static final Logger LOG = LoggerFactory.getLogger(InboundExecutableRegistryImpl.class); - private final Map> deduplicationScopesByType; public InboundExecutableRegistryImpl( InboundConnectorFactory connectorFactory, BatchExecutableProcessor batchExecutableProcessor) { this.batchExecutableProcessor = batchExecutableProcessor; + this.cancellationManager = CancellationManager.create(executables); this.executorService = Executors.newSingleThreadExecutor(); eventQueue = new LinkedBlockingQueue<>(); deduplicationScopesByType = @@ -92,28 +86,6 @@ public void publishEvent(InboundExecutableEvent event) { LOG.debug("Event added to the queue: {}", event); } - private final BiConsumer cancellationCallback = - (throwable, id) -> { - LOG.warn("Inbound connector executable has requested its cancellation", throwable); - var toCancel = executables.get(id); - if (toCancel == null) { - LOG.error("Inbound connector executable not found for the given ID: {}", id); - return; - } - if (toCancel instanceof Activated activated) { - activated.context().reportHealth(Health.down(throwable)); - try { - activated.executable().deactivate(); - } catch (Exception e) { - LOG.error("Failed to deactivate connector", e); - } - } else { - LOG.error( - "Attempted to cancel an inbound connector executable that is not in the active state: {}", - id); - } - }; - void handleEvent(InboundExecutableEvent event) { switch (event) { case InboundExecutableEvent.Activated activated -> handleActivated(activated); @@ -148,7 +120,7 @@ private void handleActivated(InboundExecutableEvent.Activated activated) { .forEach(element -> executablesByElement.put(element.element(), id))); var activationResult = - batchExecutableProcessor.activateBatch(groupedConnectors, cancellationCallback); + batchExecutableProcessor.activateBatch(groupedConnectors, cancellationManager); executables.putAll(activationResult); } catch (Exception e) { @@ -241,6 +213,10 @@ private boolean matchesQuery(RegisteredExecutable executable, ActiveExecutableQu invalid.data().connectorElements().stream() .map(InboundConnectorElement::element) .toList(); + case Cancelled cancelled -> + cancelled.context().connectorElements().stream() + .map(InboundConnectorElement::element) + .toList(); }; var type = switch (executable) { @@ -248,6 +224,7 @@ private boolean matchesQuery(RegisteredExecutable executable, ActiveExecutableQu case FailedToActivate failed -> failed.data().connectorElements().getFirst().type(); case ConnectorNotRegistered notRegistered -> notRegistered.data().type(); case InvalidDefinition invalid -> invalid.data().connectorElements().getFirst().type(); + case Cancelled cancelled -> cancelled.context().getDefinition().type(); }; return elements.stream() @@ -311,6 +288,13 @@ private ActiveExecutableResponse mapToResponse(UUID id, RegisteredExecutable con new Error( "Activation failure", "Invalid connector definition: " + invalid.reason())), List.of()); + case Cancelled cancelled -> + new ActiveExecutableResponse( + id, + cancelled.executable().getClass(), + cancelled.context().connectorElements(), + Health.down(cancelled.exceptionThrown()), + cancelled.context().getLogs()); }; } @@ -328,6 +312,7 @@ public void logStatusReport() { failed.data().connectorElements().getFirst().type(); case ConnectorNotRegistered notRegistered -> notRegistered.data().type(); case InvalidDefinition invalid -> invalid.data().type(); + case Cancelled cancelled -> cancelled.context().getDefinition().type(); }, Collectors.toList())) .forEach( @@ -351,6 +336,8 @@ public void logStatusReport() { case ConnectorNotRegistered notRegistered -> notRegistered.data().tenantId(); case InvalidDefinition invalid -> invalid.data().tenantId(); + case Cancelled cancelled -> + cancelled.context().getDefinition().tenantId(); }, Collectors.counting())); groupedByTenant.forEach( diff --git a/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/executable/RegisteredExecutable.java b/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/executable/RegisteredExecutable.java index 22b57b1e2b..bce2e462d5 100644 --- a/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/executable/RegisteredExecutable.java +++ b/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/executable/RegisteredExecutable.java @@ -16,6 +16,7 @@ */ package io.camunda.connector.runtime.inbound.executable; +import io.camunda.connector.api.inbound.InboundConnectorContext; import io.camunda.connector.api.inbound.InboundConnectorExecutable; import io.camunda.connector.runtime.core.inbound.InboundConnectorReportingContext; import io.camunda.connector.runtime.core.inbound.details.InboundConnectorDetails; @@ -25,7 +26,14 @@ public sealed interface RegisteredExecutable { record Activated( - InboundConnectorExecutable executable, InboundConnectorReportingContext context) + InboundConnectorExecutable executable, + InboundConnectorReportingContext context) + implements RegisteredExecutable {} + + record Cancelled( + InboundConnectorExecutable executable, + InboundConnectorReportingContext context, + Throwable exceptionThrown) implements RegisteredExecutable {} record ConnectorNotRegistered(ValidInboundConnectorDetails data)