Skip to content

Commit

Permalink
feat(restart-inbound): give the possibility to the developer to plan …
Browse files Browse the repository at this point in the history
…a restart of the connector in case of failure
  • Loading branch information
mathias-vandaele committed Oct 15, 2024
1 parent 2e5bb3b commit cfae37a
Show file tree
Hide file tree
Showing 13 changed files with 230 additions and 119 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
* under one or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. Camunda licenses this file to you under the Apache License,
* Version 2.0; you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.camunda.connector.runtime.core.error;

import java.time.Duration;

public class RestartException extends RuntimeException {

private final Duration backoffTime;

public RestartException(String message, Throwable cause, Duration backoffTime) {
super(message, cause);
this.backoffTime = backoffTime;
}

public Duration getBackoffTime() {
return backoffTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.EvictingQueue;
import io.camunda.connector.api.inbound.Activity;
import io.camunda.connector.api.inbound.InboundConnectorContext;
import io.camunda.connector.api.inbound.InboundConnectorExecutable;
import io.camunda.connector.api.inbound.InboundIntermediateConnectorContext;
Expand All @@ -28,6 +29,7 @@
import io.camunda.document.factory.DocumentFactory;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.time.Duration;
import java.util.function.Consumer;

public class DefaultInboundConnectorContextFactory implements InboundConnectorContextFactory {
Expand Down Expand Up @@ -57,8 +59,9 @@ public DefaultInboundConnectorContextFactory(
public <T extends InboundConnectorExecutable<?>> InboundConnectorContext createContext(
final ValidInboundConnectorDetails connectorDetails,
final Consumer<Throwable> cancellationCallback,
final Consumer<Duration> reactivationCallback,
final Class<T> executableClass,
final EvictingQueue queue) {
final EvictingQueue<Activity> queue) {

InboundConnectorReportingContext inboundContext =
new InboundConnectorContextImpl(
Expand All @@ -68,6 +71,7 @@ public <T extends InboundConnectorExecutable<?>> InboundConnectorContext createC
connectorDetails,
correlationHandler,
cancellationCallback,
reactivationCallback,
objectMapper,
queue);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
package io.camunda.connector.runtime.core.inbound;

import com.google.common.collect.EvictingQueue;
import io.camunda.connector.api.inbound.Activity;
import io.camunda.connector.api.inbound.InboundConnectorContext;
import io.camunda.connector.api.inbound.InboundConnectorExecutable;
import io.camunda.connector.runtime.core.inbound.details.InboundConnectorDetails.ValidInboundConnectorDetails;
import java.time.Duration;
import java.util.function.Consumer;

/**
Expand Down Expand Up @@ -50,6 +52,7 @@ public interface InboundConnectorContextFactory {
<T extends InboundConnectorExecutable<?>> InboundConnectorContext createContext(
final ValidInboundConnectorDetails connectorDetails,
final Consumer<Throwable> cancellationCallback,
final Consumer<Duration> reactivationCallback,
final Class<T> executableClass,
final EvictingQueue queue);
final EvictingQueue<Activity> queue);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.camunda.connector.api.validation.ValidationProvider;
import io.camunda.connector.feel.FeelEngineWrapperException;
import io.camunda.connector.runtime.core.AbstractConnectorContext;
import io.camunda.connector.runtime.core.error.RestartException;
import io.camunda.connector.runtime.core.inbound.correlation.InboundCorrelationHandler;
import io.camunda.connector.runtime.core.inbound.details.InboundConnectorDetails;
import io.camunda.connector.runtime.core.inbound.details.InboundConnectorDetails.ValidInboundConnectorDetails;
Expand All @@ -36,6 +37,7 @@
import io.camunda.document.factory.DocumentFactoryImpl;
import io.camunda.document.store.DocumentCreationRequest;
import io.camunda.document.store.InMemoryDocumentStore;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -56,6 +58,7 @@ public class InboundConnectorContextImpl extends AbstractConnectorContext
private final ObjectMapper objectMapper;

private final Consumer<Throwable> cancellationCallback;
private final Consumer<Duration> reactivationCallback;
private final EvictingQueue<Activity> logs;
private final DocumentFactory documentFactory;
private Health health = Health.unknown();
Expand All @@ -68,8 +71,9 @@ public InboundConnectorContextImpl(
ValidInboundConnectorDetails connectorDetails,
InboundCorrelationHandler correlationHandler,
Consumer<Throwable> cancellationCallback,
Consumer<Duration> reactivationCallback,
ObjectMapper objectMapper,
EvictingQueue logs) {
EvictingQueue<Activity> logs) {
super(secretProvider, validationProvider);
this.documentFactory = documentFactory;
this.correlationHandler = correlationHandler;
Expand All @@ -79,6 +83,7 @@ public InboundConnectorContextImpl(
connectorDetails.rawPropertiesWithoutKeywords());
this.objectMapper = objectMapper;
this.cancellationCallback = cancellationCallback;
this.reactivationCallback = reactivationCallback;
this.logs = logs;
}

Expand All @@ -88,6 +93,7 @@ public InboundConnectorContextImpl(
ValidInboundConnectorDetails connectorDetails,
InboundCorrelationHandler correlationHandler,
Consumer<Throwable> cancellationCallback,
Consumer<Duration> reactivationCallback,
ObjectMapper objectMapper,
EvictingQueue logs) {
this(
Expand All @@ -97,6 +103,7 @@ public InboundConnectorContextImpl(
connectorDetails,
correlationHandler,
cancellationCallback,
reactivationCallback,
objectMapper,
logs);
}
Expand Down Expand Up @@ -125,6 +132,9 @@ public void cancel(Throwable exception) {
} catch (Throwable e) {
LOG.error("Failed to deliver the cancellation signal to the runtime", e);
}
if (Objects.requireNonNull(exception) instanceof RestartException restartException) {
reactivationCallback.accept(restartException.getBackoffTime());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.camunda.connector.runtime.core.inbound.details.InboundConnectorDetails.ValidInboundConnectorDetails;
import io.camunda.connector.runtime.core.secret.SecretProviderAggregator;
import io.camunda.document.factory.DocumentFactory;
import java.time.Duration;
import java.util.function.Consumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -44,6 +45,7 @@ class DefaultInboundConnectorContextFactoryTest {
@Mock private ValidationProvider validationProvider;
@Mock private OperateClientAdapter operateClientAdapter;
@Mock private Consumer<Throwable> cancellationCallback;
@Mock private Consumer<Duration> reactivationCallback;
@Mock private ValidInboundConnectorDetails newConnector;
@Mock private DocumentFactory documentFactory;
private DefaultInboundConnectorContextFactory factory;
Expand All @@ -66,6 +68,7 @@ void shouldCreateInboundConnectorContext() {
factory.createContext(
newConnector,
cancellationCallback,
reactivationCallback,
ExecutableWithInboundContext.class,
EvictingQueue.create(10));

Expand All @@ -78,6 +81,7 @@ void shouldCreateInboundConnectorContextWhenParameterizedTypeIsEmpty() {
factory.createContext(
newConnector,
cancellationCallback,
reactivationCallback,
ExecutableWithEmptyParameterizedType.class,
EvictingQueue.create(10));

Expand All @@ -91,6 +95,7 @@ void shouldCreateInboundIntermediateConnectorContext() {
factory.createContext(
newConnector,
cancellationCallback,
reactivationCallback,
ExecutableWithIntermediate.class,
EvictingQueue.create(10));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,21 @@ class InboundConnectorContextImplTest {
private final SecretProvider secretProvider = new FooBarSecretProvider();
private final ObjectMapper mapper = ConnectorsObjectMapperSupplier.DEFAULT_MAPPER;

@NotNull
private static ValidInboundConnectorDetails getInboundConnectorDefinition(
Map<String, String> properties) {
properties = new HashMap<>(properties);
properties.put("inbound.type", "io.camunda:connector:1");
InboundConnectorElement element =
new InboundConnectorElement(
properties,
new StandaloneMessageCorrelationPoint("", "", null, null),
new ProcessElement("bool", 0, 0, "id", "<default>"));
var details = InboundConnectorDetails.of(element.deduplicationId(List.of()), List.of(element));
assertThat(details).isInstanceOf(ValidInboundConnectorDetails.class);
return (ValidInboundConnectorDetails) details;
}

@Test
void bindProperties_shouldThrowExceptionWhenWrongFormat() {
// given
Expand All @@ -52,6 +67,7 @@ void bindProperties_shouldThrowExceptionWhenWrongFormat() {
definition,
null,
(e) -> {},
(d) -> {},
mapper,
EvictingQueue.create(10));
// when and then
Expand All @@ -73,6 +89,7 @@ void bindProperties_shouldParseNullValue() {
definition,
null,
(e) -> {},
(d) -> {},
mapper,
EvictingQueue.create(10));
// when
Expand All @@ -98,6 +115,7 @@ void bindProperties_shouldParseStringAsString() {
definition,
null,
(e) -> {},
(d) -> {},
mapper,
EvictingQueue.create(10));
// when
Expand All @@ -108,21 +126,6 @@ void bindProperties_shouldParseStringAsString() {
.isInstanceOf(String.class);
}

@NotNull
private static ValidInboundConnectorDetails getInboundConnectorDefinition(
Map<String, String> properties) {
properties = new HashMap<>(properties);
properties.put("inbound.type", "io.camunda:connector:1");
InboundConnectorElement element =
new InboundConnectorElement(
properties,
new StandaloneMessageCorrelationPoint("", "", null, null),
new ProcessElement("bool", 0, 0, "id", "<default>"));
var details = InboundConnectorDetails.of(element.deduplicationId(List.of()), List.of(element));
assertThat(details).isInstanceOf(ValidInboundConnectorDetails.class);
return (ValidInboundConnectorDetails) details;
}

@Test
void bindProperties_shouldParseAllObject() {
// Given
Expand Down Expand Up @@ -156,6 +159,7 @@ void bindProperties_shouldParseAllObject() {
definition,
null,
(e) -> {},
(d) -> {},
mapper,
EvictingQueue.create(10));
// when
Expand All @@ -177,6 +181,7 @@ void getProperties_shouldNotParseFeel() {
definition,
null,
(e) -> {},
(d) -> {},
mapper,
EvictingQueue.create(10));

Expand Down Expand Up @@ -265,8 +270,6 @@ public void setBool(final boolean bool) {
this.bool = bool;
}

public record InnerObject(List<String> stringList, boolean bool) {}

@Override
public boolean equals(final Object o) {
if (this == o) return true;
Expand Down Expand Up @@ -325,5 +328,7 @@ public String toString() {
+ bool
+ "}";
}

public record InnerObject(List<String> stringList, boolean bool) {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.common.collect.EvictingQueue;
import io.camunda.connector.api.inbound.Health;
import io.camunda.connector.api.inbound.InboundConnectorContext;
import io.camunda.connector.api.inbound.InboundConnectorExecutable;
import io.camunda.connector.api.inbound.webhook.WebhookConnectorExecutable;
import io.camunda.connector.runtime.core.inbound.InboundConnectorContextFactory;
Expand All @@ -33,14 +34,15 @@
import io.camunda.connector.runtime.inbound.webhook.WebhookConnectorRegistry;
import io.camunda.connector.runtime.metrics.ConnectorMetrics.Inbound;
import io.camunda.zeebe.spring.client.metrics.MetricsRecorder;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -49,15 +51,14 @@
public class BatchExecutableProcessor {

private static final Logger LOG = LoggerFactory.getLogger(BatchExecutableProcessor.class);

@Value("${camunda.connector.inbound.log.size:10}")
private int inboundLogsSize;

private final InboundConnectorFactory connectorFactory;
private final InboundConnectorContextFactory connectorContextFactory;
private final MetricsRecorder metricsRecorder;
private final WebhookConnectorRegistry webhookConnectorRegistry;

@Value("${camunda.connector.inbound.log.size:10}")
private int inboundLogsSize;

public BatchExecutableProcessor(
InboundConnectorFactory connectorFactory,
InboundConnectorContextFactory connectorContextFactory,
Expand All @@ -76,7 +77,8 @@ public BatchExecutableProcessor(
*/
public Map<UUID, RegisteredExecutable> activateBatch(
Map<UUID, InboundConnectorDetails> request,
BiConsumer<Throwable, UUID> cancellationCallback) {
Function<UUID, Consumer<Throwable>> cancellationCallbackMaker,
Function<UUID, Consumer<Duration>> reactivationCallbackMaker) {

final Map<UUID, RegisteredExecutable> alreadyActivated = new HashMap<>();

Expand All @@ -94,7 +96,8 @@ public Map<UUID, RegisteredExecutable> activateBatch(
}

final RegisteredExecutable result =
activateSingle(data, e -> cancellationCallback.accept(e, id));
activateSingle(
data, cancellationCallbackMaker.apply(id), reactivationCallbackMaker.apply(id));

switch (result) {
case Activated activated -> alreadyActivated.put(id, activated);
Expand Down Expand Up @@ -138,14 +141,16 @@ public Map<UUID, RegisteredExecutable> activateBatch(
}

private RegisteredExecutable activateSingle(
InboundConnectorDetails data, Consumer<Throwable> cancellationCallback) {
InboundConnectorDetails data,
Consumer<Throwable> cancellationCallback,
Consumer<Duration> reactivationCallback) {

if (data instanceof InvalidInboundConnectorDetails invalid) {
return new InvalidDefinition(invalid, invalid.error().getMessage());
}
var validData = (ValidInboundConnectorDetails) data;

final InboundConnectorExecutable executable;
final InboundConnectorExecutable<InboundConnectorContext> executable;
final InboundConnectorReportingContext context;

try {
Expand All @@ -155,6 +160,7 @@ private RegisteredExecutable activateSingle(
connectorContextFactory.createContext(
validData,
cancellationCallback,
reactivationCallback,
executable.getClass(),
EvictingQueue.create(inboundLogsSize));
} catch (NoSuchElementException e) {
Expand Down
Loading

0 comments on commit cfae37a

Please sign in to comment.