Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:(restart-inbound-connectors): create a restart inbound mechanism after cancelling #3505

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.EvictingQueue;
import io.camunda.connector.api.inbound.Activity;
import io.camunda.connector.api.inbound.InboundConnectorContext;
import io.camunda.connector.api.inbound.InboundConnectorExecutable;
import io.camunda.connector.api.inbound.InboundIntermediateConnectorContext;
Expand Down Expand Up @@ -58,7 +59,7 @@ public <T extends InboundConnectorExecutable<?>> InboundConnectorContext createC
final ValidInboundConnectorDetails connectorDetails,
final Consumer<Throwable> cancellationCallback,
final Class<T> executableClass,
final EvictingQueue queue) {
final EvictingQueue<Activity> queue) {

InboundConnectorReportingContext inboundContext =
new InboundConnectorContextImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.camunda.connector.runtime.core.inbound;

import com.google.common.collect.EvictingQueue;
import io.camunda.connector.api.inbound.Activity;
import io.camunda.connector.api.inbound.InboundConnectorContext;
import io.camunda.connector.api.inbound.InboundConnectorExecutable;
import io.camunda.connector.runtime.core.inbound.details.InboundConnectorDetails.ValidInboundConnectorDetails;
Expand Down Expand Up @@ -51,5 +52,5 @@ <T extends InboundConnectorExecutable<?>> InboundConnectorContext createContext(
final ValidInboundConnectorDetails connectorDetails,
final Consumer<Throwable> cancellationCallback,
final Class<T> executableClass,
final EvictingQueue queue);
final EvictingQueue<Activity> queue);
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public InboundConnectorContextImpl(
InboundCorrelationHandler correlationHandler,
Consumer<Throwable> cancellationCallback,
ObjectMapper objectMapper,
EvictingQueue logs) {
EvictingQueue<Activity> logs) {
super(secretProvider, validationProvider);
this.documentFactory = documentFactory;
this.correlationHandler = correlationHandler;
Expand All @@ -89,7 +89,7 @@ public InboundConnectorContextImpl(
InboundCorrelationHandler correlationHandler,
Consumer<Throwable> cancellationCallback,
ObjectMapper objectMapper,
EvictingQueue logs) {
EvictingQueue<Activity> logs) {
this(
secretProvider,
validationProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.common.collect.EvictingQueue;
import io.camunda.connector.api.inbound.Health;
import io.camunda.connector.api.inbound.InboundConnectorContext;
import io.camunda.connector.api.inbound.InboundConnectorExecutable;
import io.camunda.connector.api.inbound.webhook.WebhookConnectorExecutable;
import io.camunda.connector.runtime.core.inbound.InboundConnectorContextFactory;
Expand All @@ -39,7 +40,6 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -75,8 +75,7 @@ public BatchExecutableProcessor(
* considered valid).
*/
public Map<UUID, RegisteredExecutable> activateBatch(
Map<UUID, InboundConnectorDetails> request,
BiConsumer<Throwable, UUID> cancellationCallback) {
Map<UUID, InboundConnectorDetails> request, CancellationManager cancellationManager) {

final Map<UUID, RegisteredExecutable> alreadyActivated = new HashMap<>();

Expand All @@ -94,12 +93,13 @@ public Map<UUID, RegisteredExecutable> activateBatch(
}

final RegisteredExecutable result =
activateSingle(data, e -> cancellationCallback.accept(e, id));
activateSingle(data, cancellationManager.createCancellationCallback(id));

switch (result) {
case Activated activated -> alreadyActivated.put(id, activated);
case ConnectorNotRegistered notRegistered -> alreadyActivated.put(id, notRegistered);
case InvalidDefinition invalid -> alreadyActivated.put(id, invalid);
case RegisteredExecutable.Cancelled cancelled -> alreadyActivated.put(id, cancelled);
case FailedToActivate failed -> {
LOG.error(
"Failed to activate connector of type '{}' with deduplication ID '{}', reason: {}. "
Expand Down Expand Up @@ -145,7 +145,7 @@ private RegisteredExecutable activateSingle(
}
var validData = (ValidInboundConnectorDetails) data;

final InboundConnectorExecutable executable;
final InboundConnectorExecutable<InboundConnectorContext> executable;
final InboundConnectorReportingContext context;

try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
* under one or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. Camunda licenses this file to you under the Apache License,
* Version 2.0; you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.camunda.connector.runtime.inbound.executable;

import io.camunda.connector.api.error.RestartException;
import io.camunda.connector.api.inbound.Health;
import java.time.Duration;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CancellationManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you provide some documentation what the ConcellationManager is responsible for?


private static final Logger LOG = LoggerFactory.getLogger(CancellationManager.class);
private final Map<UUID, RegisteredExecutable> executables;
private final ScheduledExecutorService reactivationExecutor;

public CancellationManager(Map<UUID, RegisteredExecutable> executables) {
this.executables = executables;
this.reactivationExecutor = Executors.newSingleThreadScheduledExecutor();
}

public static CancellationManager create(Map<UUID, RegisteredExecutable> executables) {
return new CancellationManager(executables);
}

public Consumer<Throwable> createCancellationCallback(UUID uuid) {
return (throwable) -> {
switch (throwable) {
case RestartException restartException -> {
handleCancellation(uuid, throwable);
handleRestart(uuid, restartException.getDelay(), restartException.getRetryAttempts());
}
default -> handleCancellation(uuid, throwable);
}
};
}

private void handleRestart(UUID uuid, Duration delay, Integer retryAttempts) {
LOG.warn(
"Inbound connector executable has requested its reactivation in {} {}",
delay.getSeconds(),
TimeUnit.SECONDS);
var toReactivate = executables.get(uuid);
if (toReactivate == null) {
LOG.error("Inbound connector executable not found for reactivation, received ID: {}", uuid);
return;
}
this.reactivationExecutor.schedule(
() -> tryRestart(uuid, toReactivate, delay, retryAttempts),
delay.getSeconds(),
TimeUnit.SECONDS);
}

private void tryRestart(
UUID uuid, RegisteredExecutable toReactivate, Duration delay, Integer retryAttempts) {
if (retryAttempts < 0) return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the specification here? We stop restart if the the attempts are for example -1? Its not clear when this would happen

Copy link
Contributor Author

@mathias-vandaele mathias-vandaele Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here depends of what we want to do:

  • In this case, retryAttempts means that if the user has set 3 retries, It means it will initially try then retry 3 times
  • In the other case, if the user has set 3 retries, it will try initially and then try again 2 times

What would a developer expect ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure but retryAttempts sounds like the number of attempts that already happened. I would prefer a sealed interface with first of two options: Just a restart and one with a delay.
Lets tackle the number of retries later.

if (toReactivate instanceof RegisteredExecutable.Cancelled cancelled) {
try {
cancelled.executable().activate(cancelled.context());
executables.remove(uuid);
executables.put(
uuid, new RegisteredExecutable.Activated(cancelled.executable(), cancelled.context()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you use Map.replace() to switch the entry?

LOG.info("Activation successful for ID: {}", uuid);
} catch (Exception e) {
LOG.error("Activation failed for ID: {}. Retrying... {} retries left", uuid, retryAttempts);
this.reactivationExecutor.schedule(
() -> tryRestart(uuid, toReactivate, delay, retryAttempts - 1),
delay.getSeconds(),
TimeUnit.SECONDS);
}
} else {
LOG.error("Executable is not in a cancelled state. Cannot reactivate, ID: {}", uuid);
}
}

private void handleCancellation(UUID uuid, Throwable throwable) {
LOG.warn("Inbound connector executable has requested its cancellation");
var toCancel = executables.get(uuid);
if (toCancel == null) {
LOG.error("Inbound connector executable not found for the given ID: {}", uuid);
return;
}
if (toCancel instanceof RegisteredExecutable.Activated activated) {
activated.context().reportHealth(Health.down(throwable));
try {
activated.executable().deactivate();
executables.remove(uuid);
executables.put(
uuid, new RegisteredExecutable.Cancelled(activated.executable(), activated.context()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you use Map.replace() to switch the entry?

} catch (Exception e) {
LOG.error("Failed to deactivate connector", e);
}
} else {
LOG.error(
"Attempted to cancel an inbound connector executable that is not in the active state: {}",
uuid);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.camunda.connector.runtime.core.inbound.details.InboundConnectorDetails;
import io.camunda.connector.runtime.inbound.executable.InboundExecutableEvent.Deactivated;
import io.camunda.connector.runtime.inbound.executable.RegisteredExecutable.Activated;
import io.camunda.connector.runtime.inbound.executable.RegisteredExecutable.Cancelled;
import io.camunda.connector.runtime.inbound.executable.RegisteredExecutable.ConnectorNotRegistered;
import io.camunda.connector.runtime.inbound.executable.RegisteredExecutable.FailedToActivate;
import io.camunda.connector.runtime.inbound.executable.RegisteredExecutable.InvalidDefinition;
Expand All @@ -34,34 +35,27 @@
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.BiConsumer;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;

public class InboundExecutableRegistryImpl implements InboundExecutableRegistry {

private static final Logger LOG = LoggerFactory.getLogger(InboundExecutableRegistryImpl.class);
final Map<UUID, RegisteredExecutable> executables = new ConcurrentHashMap<>();
private final BlockingQueue<InboundExecutableEvent> eventQueue;
private final ExecutorService executorService;

private final BatchExecutableProcessor batchExecutableProcessor;

private final CancellationManager cancellationManager;
private final Map<ProcessElement, UUID> executablesByElement = new ConcurrentHashMap<>();
final Map<UUID, RegisteredExecutable> executables = new HashMap<>();

private static final Logger LOG = LoggerFactory.getLogger(InboundExecutableRegistryImpl.class);

private final Map<String, List<String>> deduplicationScopesByType;

public InboundExecutableRegistryImpl(
InboundConnectorFactory connectorFactory, BatchExecutableProcessor batchExecutableProcessor) {
this.batchExecutableProcessor = batchExecutableProcessor;
this.cancellationManager = CancellationManager.create(executables);
this.executorService = Executors.newSingleThreadExecutor();
eventQueue = new LinkedBlockingQueue<>();
deduplicationScopesByType =
Expand Down Expand Up @@ -92,28 +86,6 @@ public void publishEvent(InboundExecutableEvent event) {
LOG.debug("Event added to the queue: {}", event);
}

private final BiConsumer<Throwable, UUID> cancellationCallback =
(throwable, id) -> {
LOG.warn("Inbound connector executable has requested its cancellation", throwable);
var toCancel = executables.get(id);
if (toCancel == null) {
LOG.error("Inbound connector executable not found for the given ID: {}", id);
return;
}
if (toCancel instanceof Activated activated) {
activated.context().reportHealth(Health.down(throwable));
try {
activated.executable().deactivate();
} catch (Exception e) {
LOG.error("Failed to deactivate connector", e);
}
} else {
LOG.error(
"Attempted to cancel an inbound connector executable that is not in the active state: {}",
id);
}
};

void handleEvent(InboundExecutableEvent event) {
switch (event) {
case InboundExecutableEvent.Activated activated -> handleActivated(activated);
Expand Down Expand Up @@ -148,7 +120,7 @@ private void handleActivated(InboundExecutableEvent.Activated activated) {
.forEach(element -> executablesByElement.put(element.element(), id)));

var activationResult =
batchExecutableProcessor.activateBatch(groupedConnectors, cancellationCallback);
batchExecutableProcessor.activateBatch(groupedConnectors, cancellationManager);
executables.putAll(activationResult);

} catch (Exception e) {
Expand Down Expand Up @@ -241,13 +213,18 @@ private boolean matchesQuery(RegisteredExecutable executable, ActiveExecutableQu
invalid.data().connectorElements().stream()
.map(InboundConnectorElement::element)
.toList();
case Cancelled cancelled ->
cancelled.context().connectorElements().stream()
.map(InboundConnectorElement::element)
.toList();
};
var type =
switch (executable) {
case Activated activated -> activated.context().getDefinition().type();
case FailedToActivate failed -> failed.data().connectorElements().getFirst().type();
case ConnectorNotRegistered notRegistered -> notRegistered.data().type();
case InvalidDefinition invalid -> invalid.data().connectorElements().getFirst().type();
case Cancelled cancelled -> cancelled.context().getDefinition().type();
};

return elements.stream()
Expand Down Expand Up @@ -311,6 +288,13 @@ private ActiveExecutableResponse mapToResponse(UUID id, RegisteredExecutable con
new Error(
"Activation failure", "Invalid connector definition: " + invalid.reason())),
List.of());
case Cancelled cancelled ->
new ActiveExecutableResponse(
id,
cancelled.executable().getClass(),
cancelled.context().connectorElements(),
cancelled.context().getHealth(),
cancelled.context().getLogs());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we report the exception somewhere that cause the cancellation? Would be enough if its part of the health object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true 👍

};
}

Expand All @@ -328,6 +312,7 @@ public void logStatusReport() {
failed.data().connectorElements().getFirst().type();
case ConnectorNotRegistered notRegistered -> notRegistered.data().type();
case InvalidDefinition invalid -> invalid.data().type();
case Cancelled cancelled -> cancelled.context().getDefinition().type();
},
Collectors.toList()))
.forEach(
Expand All @@ -351,6 +336,8 @@ public void logStatusReport() {
case ConnectorNotRegistered notRegistered ->
notRegistered.data().tenantId();
case InvalidDefinition invalid -> invalid.data().tenantId();
case Cancelled cancelled ->
cancelled.context().getDefinition().tenantId();
},
Collectors.counting()));
groupedByTenant.forEach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package io.camunda.connector.runtime.inbound.executable;

import io.camunda.connector.api.inbound.InboundConnectorContext;
import io.camunda.connector.api.inbound.InboundConnectorExecutable;
import io.camunda.connector.runtime.core.inbound.InboundConnectorReportingContext;
import io.camunda.connector.runtime.core.inbound.details.InboundConnectorDetails;
Expand All @@ -25,7 +26,13 @@
public sealed interface RegisteredExecutable {

record Activated(
InboundConnectorExecutable<?> executable, InboundConnectorReportingContext context)
InboundConnectorExecutable<InboundConnectorContext> executable,
InboundConnectorReportingContext context)
implements RegisteredExecutable {}

record Cancelled(
InboundConnectorExecutable<InboundConnectorContext> executable,
InboundConnectorReportingContext context)
implements RegisteredExecutable {}

record ConnectorNotRegistered(ValidInboundConnectorDetails data)
Expand Down
Loading
Loading