Skip to content

Commit

Permalink
Rm temporal version (#21045)
Browse files Browse the repository at this point in the history
* Rm temporal version

* Remove temporal version

* Update the replayed workflow

* Format

* Fix pmd
  • Loading branch information
benmoriceau authored Jan 6, 2023
1 parent 3a2b040 commit 606812b
Show file tree
Hide file tree
Showing 2 changed files with 1,070 additions and 872 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import com.fasterxml.jackson.databind.JsonNode;
import datadog.trace.api.Trace;
import io.airbyte.commons.temporal.TemporalJobType;
import io.airbyte.commons.temporal.TemporalWorkflowUtils;
import io.airbyte.commons.temporal.exception.RetryableException;
import io.airbyte.commons.temporal.scheduling.ConnectionManagerWorkflow;
Expand All @@ -21,12 +20,10 @@
import io.airbyte.commons.temporal.scheduling.state.WorkflowState;
import io.airbyte.commons.temporal.scheduling.state.listener.NoopStateListener;
import io.airbyte.config.ConnectorJobOutput;
import io.airbyte.config.ConnectorJobOutput.OutputType;
import io.airbyte.config.FailureReason;
import io.airbyte.config.FailureReason.FailureType;
import io.airbyte.config.NormalizationSummary;
import io.airbyte.config.StandardCheckConnectionInput;
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary;
Expand All @@ -48,22 +45,17 @@
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity.ScheduleRetrieverOutput;
import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivity;
import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivity.GeneratedJobInput;
import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivity.SyncInput;
import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivity.SyncInputWithAttemptNumber;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptCreationInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptCreationOutput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptFailureInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptNumberCreationOutput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptNumberFailureInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.EnsureCleanJobStateInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCancelledInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCancelledInputWithAttemptNumber;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCheckFailureInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCreationInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCreationOutput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobFailureInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobSuccessInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobSuccessInputWithAttemptNumber;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.ReportJobStartInput;
import io.airbyte.workers.temporal.scheduling.activities.RecordMetricActivity;
Expand Down Expand Up @@ -97,33 +89,6 @@
@SuppressWarnings("PMD.AvoidDuplicateLiterals")
public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow {

private static final int TASK_QUEUE_CHANGE_CURRENT_VERSION = 1;
private static final int AUTO_DISABLE_FAILING_CONNECTION_CHANGE_CURRENT_VERSION = 1;

private static final String RENAME_ATTEMPT_ID_TO_NUMBER_TAG = "rename_attempt_id_to_number";
private static final int RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION = 1;

private static final String CHECK_PREVIOUS_JOB_OR_ATTEMPT_TAG = "check_previous_job_or_attempt";
private static final int CHECK_PREVIOUS_JOB_OR_ATTEMPT_TAG_CURRENT_VERSION = 1;

private static final String ENSURE_CLEAN_JOB_STATE = "ensure_clean_job_state";
private static final int ENSURE_CLEAN_JOB_STATE_CURRENT_VERSION = 1;

private static final String CHECK_BEFORE_SYNC_TAG = "check_before_sync";
private static final int CHECK_BEFORE_SYNC_CURRENT_VERSION = 1;

private static final String CHECK_JOB_OUTPUT_TAG = "check_job_output";
private static final int CHECK_JOB_OUTPUT_TAG_CURRENT_VERSION = 1;

private static final String DELETE_RESET_JOB_STREAMS_TAG = "delete_reset_job_streams";
private static final int DELETE_RESET_JOB_STREAMS_CURRENT_VERSION = 1;
private static final String RECORD_METRIC_TAG = "record_metric";
private static final int RECORD_METRIC_CURRENT_VERSION = 1;
private static final String WORKFLOW_CONFIG_TAG = "workflow_config";
private static final int WORKFLOW_CONFIG_CURRENT_VERSION = 1;
private static final String ROUTE_ACTIVITY_TAG = "route_activity";
private static final int ROUTE_ACTIVITY_CURRENT_VERSION = 1;

private WorkflowState workflowState = new WorkflowState(UUID.randomUUID(), new NoopStateListener());

private final WorkflowInternalState workflowInternalState = new WorkflowInternalState();
Expand Down Expand Up @@ -299,22 +264,12 @@ private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterIn

private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput, final StandardSyncOutput standardSyncOutput) {
workflowState.setSuccess(true);
final int attemptCreationVersion =
Workflow.getVersion(RENAME_ATTEMPT_ID_TO_NUMBER_TAG, Workflow.DEFAULT_VERSION, RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION);

if (attemptCreationVersion < RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION) {
runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobSuccess, new JobSuccessInput(
workflowInternalState.getJobId(),
workflowInternalState.getAttemptNumber(),
connectionUpdaterInput.getConnectionId(),
standardSyncOutput));
} else {
runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobSuccessWithAttemptNumber, new JobSuccessInputWithAttemptNumber(
workflowInternalState.getJobId(),
workflowInternalState.getAttemptNumber(),
connectionUpdaterInput.getConnectionId(),
standardSyncOutput));
}

runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobSuccessWithAttemptNumber, new JobSuccessInputWithAttemptNumber(
workflowInternalState.getJobId(),
workflowInternalState.getAttemptNumber(),
connectionUpdaterInput.getConnectionId(),
standardSyncOutput));

deleteResetJobStreams();

Expand All @@ -334,25 +289,15 @@ private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput,
final StandardSyncOutput standardSyncOutput,
final FailureCause failureCause,
final Set<FailureReason> failureReasonsOverride) {
final int attemptCreationVersion =
Workflow.getVersion(RENAME_ATTEMPT_ID_TO_NUMBER_TAG, Workflow.DEFAULT_VERSION, RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION);

final Set<FailureReason> failureReasons = failureReasonsOverride.isEmpty() ? workflowInternalState.getFailures() : failureReasonsOverride;
if (attemptCreationVersion < RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION) {
runMandatoryActivity(jobCreationAndStatusUpdateActivity::attemptFailure, new AttemptFailureInput(
workflowInternalState.getJobId(),
workflowInternalState.getAttemptNumber(),
connectionUpdaterInput.getConnectionId(),
standardSyncOutput,
FailureHelper.failureSummary(failureReasons, workflowInternalState.getPartialSuccess())));
} else {
runMandatoryActivity(jobCreationAndStatusUpdateActivity::attemptFailureWithAttemptNumber, new AttemptNumberFailureInput(
workflowInternalState.getJobId(),
workflowInternalState.getAttemptNumber(),
connectionUpdaterInput.getConnectionId(),
standardSyncOutput,
FailureHelper.failureSummary(failureReasons, workflowInternalState.getPartialSuccess())));
}

runMandatoryActivity(jobCreationAndStatusUpdateActivity::attemptFailureWithAttemptNumber, new AttemptNumberFailureInput(
workflowInternalState.getJobId(),
workflowInternalState.getAttemptNumber(),
connectionUpdaterInput.getConnectionId(),
standardSyncOutput,
FailureHelper.failureSummary(failureReasons, workflowInternalState.getPartialSuccess())));

final int maxAttempt = configFetchActivity.getMaxAttempt().getMaxAttempt();
final int attemptNumber = connectionUpdaterInput.getAttemptNumber();
Expand All @@ -371,17 +316,12 @@ private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput,
runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobFailure, new JobFailureInput(connectionUpdaterInput.getJobId(),
connectionUpdaterInput.getAttemptNumber(), connectionUpdaterInput.getConnectionId(), failureReason));

final int autoDisableConnectionVersion =
Workflow.getVersion("auto_disable_failing_connection", Workflow.DEFAULT_VERSION, AUTO_DISABLE_FAILING_CONNECTION_CHANGE_CURRENT_VERSION);

if (autoDisableConnectionVersion != Workflow.DEFAULT_VERSION) {
final AutoDisableConnectionActivityInput autoDisableConnectionActivityInput =
new AutoDisableConnectionActivityInput(connectionId, Instant.ofEpochMilli(Workflow.currentTimeMillis()));
final AutoDisableConnectionOutput output = runMandatoryActivityWithOutput(
autoDisableConnectionActivity::autoDisableFailingConnection, autoDisableConnectionActivityInput);
if (output.isDisabled()) {
log.info("Auto-disabled for constantly failing for Connection {}", connectionId);
}
final AutoDisableConnectionActivityInput autoDisableConnectionActivityInput =
new AutoDisableConnectionActivityInput(connectionId, Instant.ofEpochMilli(Workflow.currentTimeMillis()));
final AutoDisableConnectionOutput output = runMandatoryActivityWithOutput(
autoDisableConnectionActivity::autoDisableFailingConnection, autoDisableConnectionActivityInput);
if (output.isDisabled()) {
log.info("Auto-disabled for constantly failing for Connection {}", connectionId);
}

// Record the failure metric
Expand All @@ -392,14 +332,6 @@ private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput,
}

private ConnectorJobOutput getCheckResponse(final CheckConnectionInput checkInput) {
final int checkJobOutputVersion =
Workflow.getVersion(CHECK_JOB_OUTPUT_TAG, Workflow.DEFAULT_VERSION, CHECK_JOB_OUTPUT_TAG_CURRENT_VERSION);

if (checkJobOutputVersion < CHECK_JOB_OUTPUT_TAG_CURRENT_VERSION) {
final StandardCheckConnectionOutput checkOutput = runMandatoryActivityWithOutput(checkActivity::run, checkInput);
return new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION).withCheckConnection(checkOutput);
}

return runMandatoryActivityWithOutput(checkActivity::runWithJobOutput, checkInput);
}

Expand All @@ -412,26 +344,13 @@ private SyncCheckConnectionFailure checkConnections(final GenerateInputActivity.
final IntegrationLauncherConfig destinationLauncherConfig = jobInputs.getDestinationLauncherConfig();
final SyncCheckConnectionFailure checkFailure = new SyncCheckConnectionFailure(jobRunConfig);

final int attemptCreationVersion =
Workflow.getVersion(CHECK_BEFORE_SYNC_TAG, Workflow.DEFAULT_VERSION, CHECK_BEFORE_SYNC_CURRENT_VERSION);

if (attemptCreationVersion < CHECK_BEFORE_SYNC_CURRENT_VERSION) {
// return early if this instance of the workflow was created beforehand
return checkFailure;
}

final StandardCheckConnectionInput sourceConfiguration = new StandardCheckConnectionInput().withConnectionConfiguration(sourceConfig);
final CheckConnectionInput checkSourceInput = new CheckConnectionInput(jobRunConfig, sourceLauncherConfig, sourceConfiguration);

final int checkJobOutputVersion =
Workflow.getVersion(CHECK_PREVIOUS_JOB_OR_ATTEMPT_TAG, Workflow.DEFAULT_VERSION, CHECK_PREVIOUS_JOB_OR_ATTEMPT_TAG_CURRENT_VERSION);
boolean isLastJobOrAttemptFailure = true;

if (checkJobOutputVersion >= CHECK_PREVIOUS_JOB_OR_ATTEMPT_TAG_CURRENT_VERSION) {
final JobCheckFailureInput jobStateInput =
new JobCheckFailureInput(Long.parseLong(jobRunConfig.getJobId()), jobRunConfig.getAttemptId().intValue(), connectionId);
isLastJobOrAttemptFailure = runMandatoryActivityWithOutput(jobCreationAndStatusUpdateActivity::isLastJobOrAttemptFailure, jobStateInput);
}
final JobCheckFailureInput jobStateInput =
new JobCheckFailureInput(Long.parseLong(jobRunConfig.getJobId()), jobRunConfig.getAttemptId().intValue(), connectionId);
final boolean isLastJobOrAttemptFailure =
runMandatoryActivityWithOutput(jobCreationAndStatusUpdateActivity::isLastJobOrAttemptFailure, jobStateInput);
if (isResetJob(sourceLauncherConfig) || checkFailure.isFailed() || !isLastJobOrAttemptFailure) {
// reset jobs don't need to connect to any external source, so check connection is unnecessary
log.info("SOURCE CHECK: Skipped");
Expand Down Expand Up @@ -693,14 +612,6 @@ private Duration getTimeToWait(final UUID connectionId) {
}

private void ensureCleanJobState(final ConnectionUpdaterInput connectionUpdaterInput) {
final int ensureCleanJobStateVersion =
Workflow.getVersion(ENSURE_CLEAN_JOB_STATE, Workflow.DEFAULT_VERSION, ENSURE_CLEAN_JOB_STATE_CURRENT_VERSION);

// For backwards compatibility and determinism, skip if workflow existed before this change
if (ensureCleanJobStateVersion < ENSURE_CLEAN_JOB_STATE_CURRENT_VERSION) {
return;
}

if (connectionUpdaterInput.getJobId() != null) {
log.info("This workflow is already attached to a job, so no need to clean job state.");
return;
Expand All @@ -710,13 +621,6 @@ private void ensureCleanJobState(final ConnectionUpdaterInput connectionUpdaterI
}

private void recordMetric(final RecordMetricInput recordMetricInput) {
final int recordMetricVersion =
Workflow.getVersion(RECORD_METRIC_TAG, Workflow.DEFAULT_VERSION, RECORD_METRIC_CURRENT_VERSION);

if (recordMetricVersion < RECORD_METRIC_CURRENT_VERSION) {
return;
}

runMandatoryActivity(recordMetricActivity::recordWorkflowCountMetric, recordMetricInput);
}

Expand Down Expand Up @@ -747,19 +651,6 @@ private Long getOrCreateJobId(final ConnectionUpdaterInput connectionUpdaterInpu
* @return The attempt number
*/
private Integer createAttempt(final long jobId) {
final int attemptCreationVersion =
Workflow.getVersion(RENAME_ATTEMPT_ID_TO_NUMBER_TAG, Workflow.DEFAULT_VERSION, RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION);

// Retrieve the attempt number but name it attempt id
if (attemptCreationVersion < RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION) {
final AttemptCreationOutput attemptCreationOutput =
runMandatoryActivityWithOutput(
jobCreationAndStatusUpdateActivity::createNewAttempt,
new AttemptCreationInput(
jobId));
return attemptCreationOutput.getAttemptId();
}

final AttemptNumberCreationOutput attemptNumberCreationOutput =
runMandatoryActivityWithOutput(
jobCreationAndStatusUpdateActivity::createNewAttemptNumber,
Expand All @@ -775,20 +666,6 @@ private Integer createAttempt(final long jobId) {
private GeneratedJobInput getJobInput() {
final Long jobId = workflowInternalState.getJobId();
final Integer attemptNumber = workflowInternalState.getAttemptNumber();
final int attemptCreationVersion =
Workflow.getVersion(RENAME_ATTEMPT_ID_TO_NUMBER_TAG, Workflow.DEFAULT_VERSION, RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION);

if (attemptCreationVersion < RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION) {
final SyncInput getSyncInputActivitySyncInput = new SyncInput(
attemptNumber,
jobId);

final GeneratedJobInput syncWorkflowInputs = runMandatoryActivityWithOutput(
getSyncInputActivity::getSyncWorkflowInput,
getSyncInputActivitySyncInput);

return syncWorkflowInputs;
}

final SyncInputWithAttemptNumber getSyncInputActivitySyncInput = new SyncInputWithAttemptNumber(
attemptNumber,
Expand All @@ -802,18 +679,6 @@ private GeneratedJobInput getJobInput() {
}

private String getSyncTaskQueue() {
final int taskQueueChangeVersion =
Workflow.getVersion("task_queue_change_from_connection_updater_to_sync", Workflow.DEFAULT_VERSION, TASK_QUEUE_CHANGE_CURRENT_VERSION);

if (taskQueueChangeVersion < TASK_QUEUE_CHANGE_CURRENT_VERSION) {
return TemporalJobType.CONNECTION_UPDATER.name();
}

final int routeActivityVersion = Workflow.getVersion(ROUTE_ACTIVITY_TAG, Workflow.DEFAULT_VERSION, ROUTE_ACTIVITY_CURRENT_VERSION);

if (routeActivityVersion < ROUTE_ACTIVITY_CURRENT_VERSION) {
return TemporalJobType.SYNC.name();
}

final RouteToSyncTaskQueueInput routeToSyncTaskQueueInput = new RouteToSyncTaskQueueInput(connectionId);
final RouteToSyncTaskQueueOutput routeToSyncTaskQueueOutput = runMandatoryActivityWithOutput(
Expand Down Expand Up @@ -910,46 +775,21 @@ private void reportCancelled(final UUID connectionId) {
final Integer attemptNumber = workflowInternalState.getAttemptNumber();
final Set<FailureReason> failures = workflowInternalState.getFailures();
final Boolean partialSuccess = workflowInternalState.getPartialSuccess();
final int attemptCreationVersion =
Workflow.getVersion(RENAME_ATTEMPT_ID_TO_NUMBER_TAG, Workflow.DEFAULT_VERSION, RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION);

if (attemptCreationVersion < RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION) {
runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobCancelled,
new JobCancelledInput(
jobId,
attemptNumber,
connectionId,
FailureHelper.failureSummaryForCancellation(jobId, attemptNumber, failures, partialSuccess)));
} else {
runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobCancelledWithAttemptNumber,
new JobCancelledInputWithAttemptNumber(
jobId,
attemptNumber,
connectionId,
FailureHelper.failureSummaryForCancellation(jobId, attemptNumber, failures, partialSuccess)));
}

runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobCancelledWithAttemptNumber,
new JobCancelledInputWithAttemptNumber(
jobId,
attemptNumber,
connectionId,
FailureHelper.failureSummaryForCancellation(jobId, attemptNumber, failures, partialSuccess)));
}

private void deleteResetJobStreams() {
final int deleteResetJobStreamsVersion =
Workflow.getVersion(DELETE_RESET_JOB_STREAMS_TAG, Workflow.DEFAULT_VERSION, DELETE_RESET_JOB_STREAMS_CURRENT_VERSION);

if (deleteResetJobStreamsVersion < DELETE_RESET_JOB_STREAMS_CURRENT_VERSION) {
return;
}

runMandatoryActivity(streamResetActivity::deleteStreamResetRecordsForJob,
new DeleteStreamResetRecordsForJobInput(connectionId, workflowInternalState.getJobId()));
}

private Duration getWorkflowRestartDelaySeconds() {
final int workflowConfigVersion =
Workflow.getVersion(WORKFLOW_CONFIG_TAG, Workflow.DEFAULT_VERSION, WORKFLOW_CONFIG_CURRENT_VERSION);

if (workflowConfigVersion < WORKFLOW_CONFIG_CURRENT_VERSION) {
return Duration.ofMinutes(10L);
}

return workflowConfigActivity.getWorkflowRestartDelaySeconds();
}

Expand Down
Loading

0 comments on commit 606812b

Please sign in to comment.