From fc5ba66e1c60ff874d55ba17e5777372b5862b21 Mon Sep 17 00:00:00 2001 From: Lake Mossman Date: Wed, 15 Jun 2022 16:07:47 -0700 Subject: [PATCH] Restart workflow after activity failure instead of quarantining (#13779) * use SHORT_ACTIVITY_OPTIONS on check connection activity so that it has retries * retry workflow after delay instead of quarantining * allow activity env vars to be configured in docker and kube * add env var for workflow restart delay and refactor slightly * update tests to handle new restart behavior * update test name * add empty env var values to .env files * fail attempt before job in cleanJobState to prevent state machine failure * change default value of max activity attempt retries from 10 to 5 --- .env | 5 + .../main/java/io/airbyte/config/Configs.java | 14 +- .../java/io/airbyte/config/EnvConfigs.java | 20 ++- .../temporal/ConnectionManagerUtils.java | 2 +- .../workers/temporal/TemporalUtils.java | 3 +- .../ConnectionManagerWorkflowImpl.java | 67 ++++++-- ...obCreationAndStatusUpdateActivityImpl.java | 6 +- .../ConnectionManagerWorkflowTest.java | 151 ++++-------------- charts/airbyte/templates/env-configmap.yaml | 4 + .../airbyte/templates/worker/deployment.yaml | 20 +++ docker-compose.yaml | 4 + kube/overlays/dev-integration-test/.env | 6 + kube/overlays/dev/.env | 6 + .../overlays/stable-with-resource-limits/.env | 6 + kube/overlays/stable/.env | 6 + kube/resources/worker.yaml | 20 +++ 16 files changed, 195 insertions(+), 145 deletions(-) diff --git a/.env b/.env index 44f10610877b..3a6aac913b07 100644 --- a/.env +++ b/.env @@ -83,6 +83,11 @@ MAX_SYNC_WORKERS=5 MAX_SPEC_WORKERS=5 MAX_CHECK_WORKERS=5 MAX_DISCOVER_WORKERS=5 +# Temporal Activity configuration +ACTIVITY_MAX_ATTEMPT= +ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS= +ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS= +WORKFLOW_FAILURE_RESTART_DELAY_SECONDS= ### FEATURE FLAGS ### diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java index f1dfda304d72..8381f15262da 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java @@ -508,9 +508,19 @@ public interface Configs { int getMaxActivityTimeoutSecond(); /** - * Get the duration in second between 2 activity attempts + * Get initial delay in seconds between two activity attempts */ - int getDelayBetweenActivityAttempts(); + int getInitialDelayBetweenActivityAttemptsSeconds(); + + /** + * Get maximum delay in seconds between two activity attempts + */ + int getMaxDelayBetweenActivityAttemptsSeconds(); + + /** + * Get the delay in seconds between an activity failing and the workflow being restarted + */ + int getWorkflowFailureRestartDelaySeconds(); /** * Get number of attempts of the non long running activities diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java index 4152df6bbdee..486575bc242e 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -111,7 +111,9 @@ public class EnvConfigs implements Configs { public static final String ACTIVITY_MAX_TIMEOUT_SECOND = "ACTIVITY_MAX_TIMEOUT_SECOND"; public static final String ACTIVITY_MAX_ATTEMPT = "ACTIVITY_MAX_ATTEMPT"; - public static final String ACTIVITY_DELAY_IN_SECOND_BETWEEN_ATTEMPTS = "ACTIVITY_DELAY_IN_SECOND_BETWEEN_ATTEMPTS"; + public static final String ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS = "ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS"; + public static final String ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS = "ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS"; + public static final String WORKFLOW_FAILURE_RESTART_DELAY_SECONDS = "WORKFLOW_FAILURE_RESTART_DELAY_SECONDS"; private static final String SHOULD_RUN_GET_SPEC_WORKFLOWS = "SHOULD_RUN_GET_SPEC_WORKFLOWS"; private static final String SHOULD_RUN_CHECK_CONNECTION_WORKFLOWS = "SHOULD_RUN_CHECK_CONNECTION_WORKFLOWS"; @@ -843,13 +845,23 @@ public int getMaxActivityTimeoutSecond() { } @Override - public int getDelayBetweenActivityAttempts() { - return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_TIMEOUT_SECOND, "30")); + public int getInitialDelayBetweenActivityAttemptsSeconds() { + return Integer.parseInt(getEnvOrDefault(ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS, "30")); + } + + @Override + public int getMaxDelayBetweenActivityAttemptsSeconds() { + return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS, String.valueOf(10 * 60))); + } + + @Override + public int getWorkflowFailureRestartDelaySeconds() { + return Integer.parseInt(getEnvOrDefault(WORKFLOW_FAILURE_RESTART_DELAY_SECONDS, String.valueOf(10 * 60))); } @Override public int getActivityNumberOfAttempt() { - return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_ATTEMPT, "10")); + return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_ATTEMPT, "5")); } // Helpers diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/ConnectionManagerUtils.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/ConnectionManagerUtils.java index df8d8662583c..ee4d9bf7d38f 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/ConnectionManagerUtils.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/ConnectionManagerUtils.java @@ -234,7 +234,7 @@ static String getConnectionManagerName(final UUID connectionId) { return "connection_manager_" + connectionId; } - static ConnectionUpdaterInput buildStartWorkflowInput(final UUID connectionId) { + public static ConnectionUpdaterInput buildStartWorkflowInput(final UUID connectionId) { return ConnectionUpdaterInput.builder() .connectionId(connectionId) .jobId(null) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java index fcae590544d0..33e1dcba1dc1 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java @@ -66,7 +66,8 @@ public static WorkflowServiceStubs createTemporalService(final String temporalHo private static final Configs configs = new EnvConfigs(); public static final RetryOptions RETRY = RetryOptions.newBuilder() .setMaximumAttempts(configs.getActivityNumberOfAttempt()) - .setInitialInterval(Duration.ofSeconds(configs.getDelayBetweenActivityAttempts())) + .setInitialInterval(Duration.ofSeconds(configs.getInitialDelayBetweenActivityAttemptsSeconds())) + .setMaximumInterval(Duration.ofSeconds(configs.getMaxDelayBetweenActivityAttemptsSeconds())) .build(); public static final String DEFAULT_NAMESPACE = "default"; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index 09ff356c9c2a..e02a3ecbf27a 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -5,6 +5,7 @@ package io.airbyte.workers.temporal.scheduling; import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.config.EnvConfigs; import io.airbyte.config.FailureReason; import io.airbyte.config.FailureReason.FailureType; import io.airbyte.config.StandardCheckConnectionInput; @@ -17,6 +18,7 @@ import io.airbyte.scheduler.models.IntegrationLauncherConfig; import io.airbyte.scheduler.models.JobRunConfig; import io.airbyte.workers.helper.FailureHelper; +import io.airbyte.workers.temporal.ConnectionManagerUtils; import io.airbyte.workers.temporal.TemporalJobType; import io.airbyte.workers.temporal.check.connection.CheckConnectionActivity; import io.airbyte.workers.temporal.check.connection.CheckConnectionActivity.CheckConnectionInput; @@ -86,6 +88,8 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow private static final String CHECK_BEFORE_SYNC_TAG = "check_before_sync"; private static final int CHECK_BEFORE_SYNC_CURRENT_VERSION = 1; + static final Duration WORKFLOW_FAILURE_RESTART_DELAY = Duration.ofSeconds(new EnvConfigs().getWorkflowFailureRestartDelaySeconds()); + private WorkflowState workflowState = new WorkflowState(UUID.randomUUID(), new NoopStateListener()); private final WorkflowInternalState workflowInternalState = new WorkflowInternalState(); @@ -101,7 +105,7 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow private final AutoDisableConnectionActivity autoDisableConnectionActivity = Workflow.newActivityStub(AutoDisableConnectionActivity.class, ActivityConfiguration.SHORT_ACTIVITY_OPTIONS); private final CheckConnectionActivity checkActivity = - Workflow.newActivityStub(CheckConnectionActivity.class, ActivityConfiguration.CHECK_ACTIVITY_OPTIONS); + Workflow.newActivityStub(CheckConnectionActivity.class, ActivityConfiguration.SHORT_ACTIVITY_OPTIONS); private CancellationScope cancellableSyncWorkflow; @@ -151,11 +155,6 @@ private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterIn return Workflow.newCancellationScope(() -> { connectionId = connectionUpdaterInput.getConnectionId(); - // Clean the job state by failing any jobs for this connection that are currently non-terminal. - // This catches cases where the temporal workflow was terminated and restarted while a job was - // actively running, leaving that job in an orphaned and non-terminal state. - ensureCleanJobState(connectionUpdaterInput); - // workflow state is only ever set in test cases. for production cases, it will always be null. if (connectionUpdaterInput.getWorkflowState() != null) { workflowState = connectionUpdaterInput.getWorkflowState(); @@ -171,6 +170,11 @@ private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterIn workflowState.setResetWithScheduling(true); } + // Clean the job state by failing any jobs for this connection that are currently non-terminal. + // This catches cases where the temporal workflow was terminated and restarted while a job was + // actively running, leaving that job in an orphaned and non-terminal state. + ensureCleanJobState(connectionUpdaterInput); + final Duration timeToWait = getTimeToWait(connectionUpdaterInput.getConnectionId()); Workflow.await(timeToWait, @@ -476,23 +480,52 @@ private void prepareForNextRunAndContinueAsNew(final ConnectionUpdaterInput conn /** * This is running a lambda function that takes {@param input} as an input. If the run of the lambda - * is thowing an exception, the workflow will be in a quarantined state and can then be manual - * un-quarantined or a retry of the failed lambda can be trigger through a signal method. + * throws an exception, the workflow will retried after a short delay. + * + * Note that if the lambda activity is configured to have retries, the exception will only be caught + * after the activity has been retried the maximum number of times. * - * We aimed to use this method for call of the temporal activity. + * This method is meant to be used for calling temporal activities. */ private OUTPUT runMandatoryActivityWithOutput(final Function mapper, final INPUT input) { try { return mapper.apply(input); } catch (final Exception e) { - log.error("Failed to run an activity for the connection " + connectionId, e); - workflowState.setQuarantined(true); - workflowState.setRetryFailedActivity(false); - Workflow.await(() -> workflowState.isRetryFailedActivity()); - log.error("Retrying an activity for the connection " + connectionId, e); - workflowState.setQuarantined(false); - workflowState.setRetryFailedActivity(false); - return runMandatoryActivityWithOutput(mapper, input); + log.error("[ACTIVITY-FAILURE] Connection " + connectionId + + " failed to run an activity. Connection manager workflow will be restarted after a delay of " + + WORKFLOW_FAILURE_RESTART_DELAY.getSeconds() + " seconds.", e); + // TODO (https://github.com/airbytehq/airbyte/issues/13773) add tracking/notification + + // Wait a short delay before restarting workflow. This is important if, for example, the failing + // activity was configured to not have retries. + // Without this delay, that activity could cause the workflow to loop extremely quickly, + // overwhelming temporal. + log.info("Waiting {} seconds before restarting the workflow for connection {}, to prevent spamming temporal with restarts.", + WORKFLOW_FAILURE_RESTART_DELAY.getSeconds(), + connectionId); + Workflow.await(WORKFLOW_FAILURE_RESTART_DELAY, () -> workflowState.isRetryFailedActivity()); + + // Accept a manual signal to retry the failed activity during this window + if (workflowState.isRetryFailedActivity()) { + log.info("Received RetryFailedActivity signal for connection {}. Retrying activity.", connectionId); + workflowState.setRetryFailedActivity(false); + return runMandatoryActivityWithOutput(mapper, input); + } + + log.info("Finished wait for connection {}, restarting connection manager workflow", connectionId); + + final ConnectionUpdaterInput newWorkflowInput = ConnectionManagerUtils.buildStartWorkflowInput(connectionId); + // this ensures that the new workflow will still perform a reset if an activity failed while + // attempting to reset the connection + if (workflowState.isResetConnection()) { + newWorkflowInput.setResetConnection(true); + newWorkflowInput.setFromJobResetFailure(true); + } + + Workflow.continueAsNew(newWorkflowInput); + + throw new IllegalStateException("This statement should never be reached, as the ConnectionManagerWorkflow for connection " + + connectionId + " was continued as new."); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java index 3b077d88a87f..dc37eb4a731f 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java @@ -285,8 +285,6 @@ private void failNonTerminalJobs(final UUID connectionId) { io.airbyte.scheduler.models.JobStatus.NON_TERMINAL_STATUSES); for (final Job job : jobs) { final long jobId = job.getId(); - log.info("Failing non-terminal job {}", jobId); - jobPersistence.failJob(jobId); // fail all non-terminal attempts for (final Attempt attempt : job.getAttempts()) { @@ -296,11 +294,15 @@ private void failNonTerminalJobs(final UUID connectionId) { // the Attempt object 'id' is actually the value of the attempt_number column in the db final int attemptNumber = (int) attempt.getId(); + log.info("Failing non-terminal attempt {} for non-terminal job {}", attemptNumber, jobId); jobPersistence.failAttempt(jobId, attemptNumber); jobPersistence.writeAttemptFailureSummary(jobId, attemptNumber, FailureHelper.failureSummaryForTemporalCleaningJobState(jobId, attemptNumber)); } + log.info("Failing non-terminal job {}", jobId); + jobPersistence.failJob(jobId); + final Job failedJob = jobPersistence.getJob(jobId); jobNotifier.failJob("Failing job in order to start from clean job state for new temporal workflow run.", failedJob); trackCompletion(failedJob, JobStatus.FAILED); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java index fcabecd63a7a..83884db872c9 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java @@ -46,6 +46,10 @@ import io.airbyte.workers.temporal.scheduling.testsyncworkflow.SyncWorkflowFailingWithHearbeatTimeoutException; import io.airbyte.workers.temporal.scheduling.testsyncworkflow.SyncWorkflowWithActivityFailureException; import io.airbyte.workers.temporal.sync.SyncWorkflow; +import io.temporal.api.enums.v1.WorkflowExecutionStatus; +import io.temporal.api.filter.v1.WorkflowExecutionFilter; +import io.temporal.api.workflowservice.v1.ListClosedWorkflowExecutionsRequest; +import io.temporal.api.workflowservice.v1.ListClosedWorkflowExecutionsResponse; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowOptions; import io.temporal.failure.ApplicationFailure; @@ -57,7 +61,6 @@ import java.util.Queue; import java.util.UUID; import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; import java.util.stream.Stream; import lombok.extern.slf4j.Slf4j; import org.assertj.core.api.Assertions; @@ -88,6 +91,7 @@ public class ConnectionManagerWorkflowTest { private static final int ATTEMPT_ID = 1; private static final Duration SCHEDULE_WAIT = Duration.ofMinutes(20L); + private static final String WORKFLOW_ID = "workflow-id"; private final ConfigFetchActivity mConfigFetchActivity = Mockito.mock(ConfigFetchActivity.class, Mockito.withSettings().withoutAnnotations()); @@ -1221,15 +1225,15 @@ public void testReplicationFailureRecorded() throws InterruptedException { } @Nested - @DisplayName("Test that the workflow are properly getting stuck") - class StuckWorkflow { + @DisplayName("Test that the workflow is properly restarted after activity failures.") + class FailedActivityWorkflow { @BeforeEach public void setup() { setupSpecificChildWorkflow(SleepingSyncWorkflow.class); } - public static Stream getSetupFailingFailingActivityBeforeRun() { + public static Stream getSetupFailingActivity() { return Stream.of( Arguments.of(new Thread(() -> Mockito.when(mJobCreationAndStatusUpdateActivity.createNewJob(Mockito.any())) .thenThrow(ApplicationFailure.newNonRetryableFailure("", "")))), @@ -1243,8 +1247,8 @@ public static Stream getSetupFailingFailingActivityBeforeRun() { } @ParameterizedTest - @MethodSource("getSetupFailingFailingActivityBeforeRun") - void testGetStuckBeforeRun(final Thread mockSetup) throws InterruptedException { + @MethodSource("getSetupFailingActivity") + void testWorkflowRestartedAfterFailedActivity(final Thread mockSetup) throws InterruptedException { mockSetup.run(); Mockito.when(mConfigFetchActivity.getTimeToWait(Mockito.any())).thenReturn(new ScheduleRetrieverOutput( Duration.ZERO)); @@ -1266,7 +1270,10 @@ void testGetStuckBeforeRun(final Thread mockSetup) throws InterruptedException { .build(); startWorkflowAndWaitUntilReady(workflow, input); - testEnv.sleep(Duration.ofMinutes(2L)); + + // Sleep test env for restart delay, plus a small buffer to ensure that the workflow executed the + // logic after the delay + testEnv.sleep(ConnectionManagerWorkflowImpl.WORKFLOW_FAILURE_RESTART_DELAY.plus(Duration.ofSeconds(10))); final Queue events = testStateListener.events(testId); @@ -1274,13 +1281,11 @@ void testGetStuckBeforeRun(final Thread mockSetup) throws InterruptedException { .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.RUNNING && changedStateEvent.isValue()) .isEmpty(); - Assertions.assertThat(events) - .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.QUARANTINED && changedStateEvent.isValue()) - .hasSize(1); + assertWorkflowWasContinuedAsNew(); } @Test - void testCanGetUnstuck() throws InterruptedException { + void testCanRetryFailedActivity() throws InterruptedException { Mockito.when(mJobCreationAndStatusUpdateActivity.createNewJob(Mockito.any())) .thenThrow(ApplicationFailure.newNonRetryableFailure("", "")) .thenReturn(new JobCreationOutput(1l)); @@ -1305,76 +1310,18 @@ void testCanGetUnstuck() throws InterruptedException { startWorkflowAndWaitUntilReady(workflow, input); - testEnv.sleep(Duration.ofSeconds(80L)); + // Sleep test env for half of restart delay, so that we know we are in the middle of the delay + testEnv.sleep(ConnectionManagerWorkflowImpl.WORKFLOW_FAILURE_RESTART_DELAY.dividedBy(2)); workflow.retryFailedActivity(); testEnv.sleep(Duration.ofSeconds(30L)); final Queue events = testStateListener.events(testId); - Assertions.assertThat(events) - .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.QUARANTINED && changedStateEvent.isValue()) - .hasSizeGreaterThanOrEqualTo(1); - Assertions.assertThat(events) .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.RETRY_FAILED_ACTIVITY && changedStateEvent.isValue()) .hasSize(1); } - public static Stream getSetupFailingFailingActivityAfterRun() { - return Stream.of( - Arguments.of((Consumer) ((ConnectionManagerWorkflow workflow) -> System.out.println("do Nothing")), - new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) - .when(mJobCreationAndStatusUpdateActivity).jobSuccessWithAttemptNumber(Mockito.any(JobSuccessInputWithAttemptNumber.class)))), - Arguments.of((Consumer) ((ConnectionManagerWorkflow workflow) -> workflow.cancelJob()), - new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) - .when(mJobCreationAndStatusUpdateActivity).jobCancelledWithAttemptNumber(Mockito.any(JobCancelledInputWithAttemptNumber.class)))), - Arguments.of((Consumer) ((ConnectionManagerWorkflow workflow) -> workflow.deleteConnection()), - new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) - .when(mConnectionDeletionActivity).deleteConnection(Mockito.any())))); - } - - @ParameterizedTest - @MethodSource("getSetupFailingFailingActivityAfterRun") - void testGetStuckAfterRun(final Consumer signalSender, final Thread mockSetup) throws InterruptedException { - mockSetup.run(); - - final UUID testId = UUID.randomUUID(); - final TestStateListener testStateListener = new TestStateListener(); - final WorkflowState workflowState = new WorkflowState(testId, testStateListener); - - final ConnectionUpdaterInput input = ConnectionUpdaterInput.builder() - .connectionId(UUID.randomUUID()) - .jobId(null) - .attemptId(null) - .fromFailure(false) - .attemptNumber(1) - .workflowState(workflowState) - .resetConnection(false) - .fromJobResetFailure(false) - .build(); - - startWorkflowAndWaitUntilReady(workflow, input); - - // wait for workflow to initialize - testEnv.sleep(Duration.ofSeconds(5)); - workflow.submitManualSync(); - - // wait for workflow to initialize - testEnv.sleep(Duration.ofSeconds(5)); - signalSender.accept(workflow); - - // TODO - // For some reason this transiently fails if it is below the runtime. - // However, this should be reported almost immediately. I think this is a bug. - testEnv.sleep(Duration.ofSeconds(SleepingSyncWorkflow.RUN_TIME.toSeconds() + 2)); - - final Queue events = testStateListener.events(testId); - - Assertions.assertThat(events) - .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.QUARANTINED && changedStateEvent.isValue()) - .hasSizeGreaterThanOrEqualTo(1); - } - } @Nested @@ -1449,53 +1396,6 @@ public void failedResetContinueAsReset() throws InterruptedException { } - @RepeatedTest(10) - @Timeout(value = 2, - unit = TimeUnit.SECONDS) - @DisplayName("Test that we are getting stuck if the report of a failure happen") - void testGetStuckAfterRun() throws InterruptedException { - Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) - .when(mJobCreationAndStatusUpdateActivity).attemptFailureWithAttemptNumber(Mockito.any()); - - Mockito.when(mConfigFetchActivity.getMaxAttempt()) - .thenReturn(new GetMaxAttemptOutput(3)); - - final UUID testId = UUID.randomUUID(); - final TestStateListener testStateListener = new TestStateListener(); - final WorkflowState workflowState = new WorkflowState(testId, testStateListener); - - final ConnectionUpdaterInput input = ConnectionUpdaterInput.builder() - .connectionId(UUID.randomUUID()) - .jobId(null) - .attemptId(null) - .fromFailure(false) - .attemptNumber(1) - .workflowState(workflowState) - .resetConnection(false) - .fromJobResetFailure(false) - .build(); - - startWorkflowAndWaitUntilReady(workflow, input); - - // wait for workflow to initialize - testEnv.sleep(Duration.ofSeconds(5)); - workflow.submitManualSync(); - - // wait for workflow to initialize - testEnv.sleep(Duration.ofSeconds(5)); - - // TODO - // For some reason this transiently fails if it is below the runtime. - // However, this should be reported almost immediately. I think this is a bug. - testEnv.sleep(Duration.ofSeconds(SleepingSyncWorkflow.RUN_TIME.toSeconds() + 2)); - - final Queue events = testStateListener.events(testId); - - Assertions.assertThat(events) - .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.QUARANTINED && changedStateEvent.isValue()) - .hasSize(1); - } - @ParameterizedTest @MethodSource("io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflowTest#getMaxAttemptForResetRetry") public void failedResetContinueAttemptAsReset(final int maxAttempt) throws InterruptedException { @@ -1623,6 +1523,7 @@ private void setupSpecificChildWorkflow(final Class ConnectionManagerWorkflow.class, WorkflowOptions.newBuilder() .setTaskQueue(TemporalJobType.CONNECTION_UPDATER.name()) + .setWorkflowId(WORKFLOW_ID) .build()); } @@ -1687,4 +1588,18 @@ private void runRetryResetWaitsAfterJobFailureTest() throws InterruptedException .isFalse(); } + private void assertWorkflowWasContinuedAsNew() { + final ListClosedWorkflowExecutionsRequest request = ListClosedWorkflowExecutionsRequest.newBuilder() + .setNamespace(testEnv.getNamespace()) + .setExecutionFilter(WorkflowExecutionFilter.newBuilder().setWorkflowId(WORKFLOW_ID)) + .build(); + final ListClosedWorkflowExecutionsResponse listResponse = testEnv + .getWorkflowService() + .blockingStub() + .listClosedWorkflowExecutions(request); + Assertions.assertThat(listResponse.getExecutionsCount()).isGreaterThanOrEqualTo(1); + Assertions.assertThat(listResponse.getExecutionsList().get(0).getStatus()) + .isEqualTo(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW); + } + } diff --git a/charts/airbyte/templates/env-configmap.yaml b/charts/airbyte/templates/env-configmap.yaml index 384c2f1ba2da..d626cf2e93c4 100644 --- a/charts/airbyte/templates/env-configmap.yaml +++ b/charts/airbyte/templates/env-configmap.yaml @@ -55,3 +55,7 @@ data: WORKSPACE_ROOT: /workspace METRIC_CLIENT: "" OTEL_COLLECTOR_ENDPOINT: "" + ACTIVITY_MAX_ATTEMPT: "" + ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS: "" + ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS: "" + WORKFLOW_FAILURE_RESTART_DELAY_SECONDS: "" diff --git a/charts/airbyte/templates/worker/deployment.yaml b/charts/airbyte/templates/worker/deployment.yaml index 86d6d1726a5f..f505592f2ca1 100644 --- a/charts/airbyte/templates/worker/deployment.yaml +++ b/charts/airbyte/templates/worker/deployment.yaml @@ -261,6 +261,26 @@ spec: configMapKeyRef: name: {{ include "common.names.fullname" . }}-env key: OTEL_COLLECTOR_ENDPOINT + - name: ACTIVITY_MAX_ATTEMPT + valueFrom: + configMapKeyRef: + name: { { include "common.names.fullname" . } }-env + key: ACTIVITY_MAX_ATTEMPT + - name: ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS + valueFrom: + configMapKeyRef: + name: { { include "common.names.fullname" . } }-env + key: ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS + - name: ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS + valueFrom: + configMapKeyRef: + name: { { include "common.names.fullname" . } }-env + key: ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS + - name: WORKFLOW_FAILURE_RESTART_DELAY_SECONDS + valueFrom: + configMapKeyRef: + name: { { include "common.names.fullname" . } }-env + key: WORKFLOW_FAILURE_RESTART_DELAY_SECONDS {{- if .Values.worker.extraEnv }} {{ .Values.worker.extraEnv | toYaml | nindent 8 }} {{- end }} diff --git a/docker-compose.yaml b/docker-compose.yaml index cd5f24849c2e..79a53b4d1d26 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -88,6 +88,10 @@ services: - WORKSPACE_ROOT=${WORKSPACE_ROOT} - METRIC_CLIENT=${METRIC_CLIENT} - OTEL_COLLECTOR_ENDPOINT=${OTEL_COLLECTOR_ENDPOINT} + - ACTIVITY_MAX_ATTEMPT=${ACTIVITY_MAX_ATTEMPT} + - ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS=${ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS} + - ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS=${ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS} + - WORKFLOW_FAILURE_RESTART_DELAY_SECONDS=${WORKFLOW_FAILURE_RESTART_DELAY_SECONDS} volumes: - /var/run/docker.sock:/var/run/docker.sock - workspace:${WORKSPACE_ROOT} diff --git a/kube/overlays/dev-integration-test/.env b/kube/overlays/dev-integration-test/.env index 77257b428be8..03d3f5cca74d 100644 --- a/kube/overlays/dev-integration-test/.env +++ b/kube/overlays/dev-integration-test/.env @@ -68,3 +68,9 @@ CONTAINER_ORCHESTRATOR_ENABLED=true # Open Telemetry Configuration METRIC_CLIENT= OTEL_COLLECTOR_ENDPOINT= + +# Temporal Activity configuration +ACTIVITY_MAX_ATTEMPT= +ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS= +ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS= +WORKFLOW_FAILURE_RESTART_DELAY_SECONDS= diff --git a/kube/overlays/dev/.env b/kube/overlays/dev/.env index ba31e7322d98..70419187554b 100644 --- a/kube/overlays/dev/.env +++ b/kube/overlays/dev/.env @@ -70,3 +70,9 @@ CONTAINER_ORCHESTRATOR_ENABLED=true # Open Telemetry Configuration METRIC_CLIENT= OTEL_COLLECTOR_ENDPOINT= + +# Temporal Activity configuration +ACTIVITY_MAX_ATTEMPT= +ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS= +ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS= +WORKFLOW_FAILURE_RESTART_DELAY_SECONDS= diff --git a/kube/overlays/stable-with-resource-limits/.env b/kube/overlays/stable-with-resource-limits/.env index 3c4230e444ad..71687de6966b 100644 --- a/kube/overlays/stable-with-resource-limits/.env +++ b/kube/overlays/stable-with-resource-limits/.env @@ -72,3 +72,9 @@ CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED=true # Open Telemetry Configuration METRIC_CLIENT= OTEL_COLLECTOR_ENDPOINT= + +# Temporal Activity configuration +ACTIVITY_MAX_ATTEMPT= +ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS= +ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS= +WORKFLOW_FAILURE_RESTART_DELAY_SECONDS= diff --git a/kube/overlays/stable/.env b/kube/overlays/stable/.env index 93a815e5a140..dc94db50e47e 100644 --- a/kube/overlays/stable/.env +++ b/kube/overlays/stable/.env @@ -70,3 +70,9 @@ CONTAINER_ORCHESTRATOR_ENABLED=true # Open Telemetry Configuration METRIC_CLIENT= OTEL_COLLECTOR_ENDPOINT= + +# Temporal Activity configuration +ACTIVITY_MAX_ATTEMPT= +ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS= +ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS= +WORKFLOW_FAILURE_RESTART_DELAY_SECONDS= diff --git a/kube/resources/worker.yaml b/kube/resources/worker.yaml index 3591812f147f..13ff6ce7ff8d 100644 --- a/kube/resources/worker.yaml +++ b/kube/resources/worker.yaml @@ -230,6 +230,26 @@ spec: configMapKeyRef: name: airbyte-env key: OTEL_COLLECTOR_ENDPOINT + - name: ACTIVITY_MAX_ATTEMPT + valueFrom: + configMapKeyRef: + name: airbyte-env + key: ACTIVITY_MAX_ATTEMPT + - name: ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS + valueFrom: + configMapKeyRef: + name: airbyte-env + key: ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS + - name: ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS + valueFrom: + configMapKeyRef: + name: airbyte-env + key: ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS + - name: WORKFLOW_FAILURE_RESTART_DELAY_SECONDS + valueFrom: + configMapKeyRef: + name: airbyte-env + key: WORKFLOW_FAILURE_RESTART_DELAY_SECONDS ports: - containerPort: 9000 # for heartbeat server - containerPort: 9001 # start temporal worker port pool