Skip to content

Commit

Permalink
Restart workflow after activity failure instead of quarantining (#13779)
Browse files Browse the repository at this point in the history
* use SHORT_ACTIVITY_OPTIONS on check connection activity so that it has retries

* retry workflow after delay instead of quarantining

* allow activity env vars to be configured in docker and kube

* add env var for workflow restart delay and refactor slightly

* update tests to handle new restart behavior

* update test name

* add empty env var values to .env files

* fail attempt before job in cleanJobState to prevent state machine failure

* change default value of max activity attempt retries from 10 to 5
  • Loading branch information
lmossman authored Jun 15, 2022
1 parent e8146e5 commit fc5ba66
Show file tree
Hide file tree
Showing 16 changed files with 195 additions and 145 deletions.
5 changes: 5 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ MAX_SYNC_WORKERS=5
MAX_SPEC_WORKERS=5
MAX_CHECK_WORKERS=5
MAX_DISCOVER_WORKERS=5
# Temporal Activity configuration
ACTIVITY_MAX_ATTEMPT=
ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS=
ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS=
WORKFLOW_FAILURE_RESTART_DELAY_SECONDS=


### FEATURE FLAGS ###
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,9 +508,19 @@ public interface Configs {
int getMaxActivityTimeoutSecond();

/**
* Get the duration in second between 2 activity attempts
* Get initial delay in seconds between two activity attempts
*/
int getDelayBetweenActivityAttempts();
int getInitialDelayBetweenActivityAttemptsSeconds();

/**
* Get maximum delay in seconds between two activity attempts
*/
int getMaxDelayBetweenActivityAttemptsSeconds();

/**
* Get the delay in seconds between an activity failing and the workflow being restarted
*/
int getWorkflowFailureRestartDelaySeconds();

/**
* Get number of attempts of the non long running activities
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ public class EnvConfigs implements Configs {

public static final String ACTIVITY_MAX_TIMEOUT_SECOND = "ACTIVITY_MAX_TIMEOUT_SECOND";
public static final String ACTIVITY_MAX_ATTEMPT = "ACTIVITY_MAX_ATTEMPT";
public static final String ACTIVITY_DELAY_IN_SECOND_BETWEEN_ATTEMPTS = "ACTIVITY_DELAY_IN_SECOND_BETWEEN_ATTEMPTS";
public static final String ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS = "ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS";
public static final String ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS = "ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS";
public static final String WORKFLOW_FAILURE_RESTART_DELAY_SECONDS = "WORKFLOW_FAILURE_RESTART_DELAY_SECONDS";

private static final String SHOULD_RUN_GET_SPEC_WORKFLOWS = "SHOULD_RUN_GET_SPEC_WORKFLOWS";
private static final String SHOULD_RUN_CHECK_CONNECTION_WORKFLOWS = "SHOULD_RUN_CHECK_CONNECTION_WORKFLOWS";
Expand Down Expand Up @@ -843,13 +845,23 @@ public int getMaxActivityTimeoutSecond() {
}

@Override
public int getDelayBetweenActivityAttempts() {
return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_TIMEOUT_SECOND, "30"));
public int getInitialDelayBetweenActivityAttemptsSeconds() {
return Integer.parseInt(getEnvOrDefault(ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS, "30"));
}

@Override
public int getMaxDelayBetweenActivityAttemptsSeconds() {
return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS, String.valueOf(10 * 60)));
}

@Override
public int getWorkflowFailureRestartDelaySeconds() {
return Integer.parseInt(getEnvOrDefault(WORKFLOW_FAILURE_RESTART_DELAY_SECONDS, String.valueOf(10 * 60)));
}

@Override
public int getActivityNumberOfAttempt() {
return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_ATTEMPT, "10"));
return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_ATTEMPT, "5"));
}

// Helpers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ static String getConnectionManagerName(final UUID connectionId) {
return "connection_manager_" + connectionId;
}

static ConnectionUpdaterInput buildStartWorkflowInput(final UUID connectionId) {
public static ConnectionUpdaterInput buildStartWorkflowInput(final UUID connectionId) {
return ConnectionUpdaterInput.builder()
.connectionId(connectionId)
.jobId(null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ public static WorkflowServiceStubs createTemporalService(final String temporalHo
private static final Configs configs = new EnvConfigs();
public static final RetryOptions RETRY = RetryOptions.newBuilder()
.setMaximumAttempts(configs.getActivityNumberOfAttempt())
.setInitialInterval(Duration.ofSeconds(configs.getDelayBetweenActivityAttempts()))
.setInitialInterval(Duration.ofSeconds(configs.getInitialDelayBetweenActivityAttemptsSeconds()))
.setMaximumInterval(Duration.ofSeconds(configs.getMaxDelayBetweenActivityAttemptsSeconds()))
.build();

public static final String DEFAULT_NAMESPACE = "default";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.workers.temporal.scheduling;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.FailureReason;
import io.airbyte.config.FailureReason.FailureType;
import io.airbyte.config.StandardCheckConnectionInput;
Expand All @@ -17,6 +18,7 @@
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
import io.airbyte.scheduler.models.JobRunConfig;
import io.airbyte.workers.helper.FailureHelper;
import io.airbyte.workers.temporal.ConnectionManagerUtils;
import io.airbyte.workers.temporal.TemporalJobType;
import io.airbyte.workers.temporal.check.connection.CheckConnectionActivity;
import io.airbyte.workers.temporal.check.connection.CheckConnectionActivity.CheckConnectionInput;
Expand Down Expand Up @@ -86,6 +88,8 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow
private static final String CHECK_BEFORE_SYNC_TAG = "check_before_sync";
private static final int CHECK_BEFORE_SYNC_CURRENT_VERSION = 1;

static final Duration WORKFLOW_FAILURE_RESTART_DELAY = Duration.ofSeconds(new EnvConfigs().getWorkflowFailureRestartDelaySeconds());

private WorkflowState workflowState = new WorkflowState(UUID.randomUUID(), new NoopStateListener());

private final WorkflowInternalState workflowInternalState = new WorkflowInternalState();
Expand All @@ -101,7 +105,7 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow
private final AutoDisableConnectionActivity autoDisableConnectionActivity =
Workflow.newActivityStub(AutoDisableConnectionActivity.class, ActivityConfiguration.SHORT_ACTIVITY_OPTIONS);
private final CheckConnectionActivity checkActivity =
Workflow.newActivityStub(CheckConnectionActivity.class, ActivityConfiguration.CHECK_ACTIVITY_OPTIONS);
Workflow.newActivityStub(CheckConnectionActivity.class, ActivityConfiguration.SHORT_ACTIVITY_OPTIONS);

private CancellationScope cancellableSyncWorkflow;

Expand Down Expand Up @@ -151,11 +155,6 @@ private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterIn
return Workflow.newCancellationScope(() -> {
connectionId = connectionUpdaterInput.getConnectionId();

// Clean the job state by failing any jobs for this connection that are currently non-terminal.
// This catches cases where the temporal workflow was terminated and restarted while a job was
// actively running, leaving that job in an orphaned and non-terminal state.
ensureCleanJobState(connectionUpdaterInput);

// workflow state is only ever set in test cases. for production cases, it will always be null.
if (connectionUpdaterInput.getWorkflowState() != null) {
workflowState = connectionUpdaterInput.getWorkflowState();
Expand All @@ -171,6 +170,11 @@ private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterIn
workflowState.setResetWithScheduling(true);
}

// Clean the job state by failing any jobs for this connection that are currently non-terminal.
// This catches cases where the temporal workflow was terminated and restarted while a job was
// actively running, leaving that job in an orphaned and non-terminal state.
ensureCleanJobState(connectionUpdaterInput);

final Duration timeToWait = getTimeToWait(connectionUpdaterInput.getConnectionId());

Workflow.await(timeToWait,
Expand Down Expand Up @@ -476,23 +480,52 @@ private void prepareForNextRunAndContinueAsNew(final ConnectionUpdaterInput conn

/**
* This is running a lambda function that takes {@param input} as an input. If the run of the lambda
* is thowing an exception, the workflow will be in a quarantined state and can then be manual
* un-quarantined or a retry of the failed lambda can be trigger through a signal method.
* throws an exception, the workflow will retried after a short delay.
*
* Note that if the lambda activity is configured to have retries, the exception will only be caught
* after the activity has been retried the maximum number of times.
*
* We aimed to use this method for call of the temporal activity.
* This method is meant to be used for calling temporal activities.
*/
private <INPUT, OUTPUT> OUTPUT runMandatoryActivityWithOutput(final Function<INPUT, OUTPUT> mapper, final INPUT input) {
try {
return mapper.apply(input);
} catch (final Exception e) {
log.error("Failed to run an activity for the connection " + connectionId, e);
workflowState.setQuarantined(true);
workflowState.setRetryFailedActivity(false);
Workflow.await(() -> workflowState.isRetryFailedActivity());
log.error("Retrying an activity for the connection " + connectionId, e);
workflowState.setQuarantined(false);
workflowState.setRetryFailedActivity(false);
return runMandatoryActivityWithOutput(mapper, input);
log.error("[ACTIVITY-FAILURE] Connection " + connectionId +
" failed to run an activity. Connection manager workflow will be restarted after a delay of " +
WORKFLOW_FAILURE_RESTART_DELAY.getSeconds() + " seconds.", e);
// TODO (https://github.com/airbytehq/airbyte/issues/13773) add tracking/notification

// Wait a short delay before restarting workflow. This is important if, for example, the failing
// activity was configured to not have retries.
// Without this delay, that activity could cause the workflow to loop extremely quickly,
// overwhelming temporal.
log.info("Waiting {} seconds before restarting the workflow for connection {}, to prevent spamming temporal with restarts.",
WORKFLOW_FAILURE_RESTART_DELAY.getSeconds(),
connectionId);
Workflow.await(WORKFLOW_FAILURE_RESTART_DELAY, () -> workflowState.isRetryFailedActivity());

// Accept a manual signal to retry the failed activity during this window
if (workflowState.isRetryFailedActivity()) {
log.info("Received RetryFailedActivity signal for connection {}. Retrying activity.", connectionId);
workflowState.setRetryFailedActivity(false);
return runMandatoryActivityWithOutput(mapper, input);
}

log.info("Finished wait for connection {}, restarting connection manager workflow", connectionId);

final ConnectionUpdaterInput newWorkflowInput = ConnectionManagerUtils.buildStartWorkflowInput(connectionId);
// this ensures that the new workflow will still perform a reset if an activity failed while
// attempting to reset the connection
if (workflowState.isResetConnection()) {
newWorkflowInput.setResetConnection(true);
newWorkflowInput.setFromJobResetFailure(true);
}

Workflow.continueAsNew(newWorkflowInput);

throw new IllegalStateException("This statement should never be reached, as the ConnectionManagerWorkflow for connection "
+ connectionId + " was continued as new.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,6 @@ private void failNonTerminalJobs(final UUID connectionId) {
io.airbyte.scheduler.models.JobStatus.NON_TERMINAL_STATUSES);
for (final Job job : jobs) {
final long jobId = job.getId();
log.info("Failing non-terminal job {}", jobId);
jobPersistence.failJob(jobId);

// fail all non-terminal attempts
for (final Attempt attempt : job.getAttempts()) {
Expand All @@ -296,11 +294,15 @@ private void failNonTerminalJobs(final UUID connectionId) {

// the Attempt object 'id' is actually the value of the attempt_number column in the db
final int attemptNumber = (int) attempt.getId();
log.info("Failing non-terminal attempt {} for non-terminal job {}", attemptNumber, jobId);
jobPersistence.failAttempt(jobId, attemptNumber);
jobPersistence.writeAttemptFailureSummary(jobId, attemptNumber,
FailureHelper.failureSummaryForTemporalCleaningJobState(jobId, attemptNumber));
}

log.info("Failing non-terminal job {}", jobId);
jobPersistence.failJob(jobId);

final Job failedJob = jobPersistence.getJob(jobId);
jobNotifier.failJob("Failing job in order to start from clean job state for new temporal workflow run.", failedJob);
trackCompletion(failedJob, JobStatus.FAILED);
Expand Down
Loading

0 comments on commit fc5ba66

Please sign in to comment.