Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rm temporal version #21045

Merged
merged 6 commits into from
Jan 6, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import com.fasterxml.jackson.databind.JsonNode;
import datadog.trace.api.Trace;
import io.airbyte.commons.temporal.TemporalJobType;
import io.airbyte.commons.temporal.TemporalWorkflowUtils;
import io.airbyte.commons.temporal.exception.RetryableException;
import io.airbyte.commons.temporal.scheduling.ConnectionManagerWorkflow;
Expand All @@ -21,12 +20,10 @@
import io.airbyte.commons.temporal.scheduling.state.WorkflowState;
import io.airbyte.commons.temporal.scheduling.state.listener.NoopStateListener;
import io.airbyte.config.ConnectorJobOutput;
import io.airbyte.config.ConnectorJobOutput.OutputType;
import io.airbyte.config.FailureReason;
import io.airbyte.config.FailureReason.FailureType;
import io.airbyte.config.NormalizationSummary;
import io.airbyte.config.StandardCheckConnectionInput;
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary;
Expand All @@ -48,22 +45,17 @@
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity.ScheduleRetrieverOutput;
import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivity;
import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivity.GeneratedJobInput;
import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivity.SyncInput;
import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivity.SyncInputWithAttemptNumber;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptCreationInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptCreationOutput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptFailureInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptNumberCreationOutput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptNumberFailureInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.EnsureCleanJobStateInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCancelledInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCancelledInputWithAttemptNumber;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCheckFailureInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCreationInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCreationOutput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobFailureInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobSuccessInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobSuccessInputWithAttemptNumber;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.ReportJobStartInput;
import io.airbyte.workers.temporal.scheduling.activities.RecordMetricActivity;
Expand Down Expand Up @@ -97,33 +89,6 @@
@SuppressWarnings("PMD.AvoidDuplicateLiterals")
public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow {

private static final int TASK_QUEUE_CHANGE_CURRENT_VERSION = 1;
private static final int AUTO_DISABLE_FAILING_CONNECTION_CHANGE_CURRENT_VERSION = 1;

private static final String RENAME_ATTEMPT_ID_TO_NUMBER_TAG = "rename_attempt_id_to_number";
private static final int RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION = 1;

private static final String CHECK_PREVIOUS_JOB_OR_ATTEMPT_TAG = "check_previous_job_or_attempt";
private static final int CHECK_PREVIOUS_JOB_OR_ATTEMPT_TAG_CURRENT_VERSION = 1;

private static final String ENSURE_CLEAN_JOB_STATE = "ensure_clean_job_state";
private static final int ENSURE_CLEAN_JOB_STATE_CURRENT_VERSION = 1;

private static final String CHECK_BEFORE_SYNC_TAG = "check_before_sync";
private static final int CHECK_BEFORE_SYNC_CURRENT_VERSION = 1;

private static final String CHECK_JOB_OUTPUT_TAG = "check_job_output";
private static final int CHECK_JOB_OUTPUT_TAG_CURRENT_VERSION = 1;

private static final String DELETE_RESET_JOB_STREAMS_TAG = "delete_reset_job_streams";
private static final int DELETE_RESET_JOB_STREAMS_CURRENT_VERSION = 1;
private static final String RECORD_METRIC_TAG = "record_metric";
private static final int RECORD_METRIC_CURRENT_VERSION = 1;
private static final String WORKFLOW_CONFIG_TAG = "workflow_config";
private static final int WORKFLOW_CONFIG_CURRENT_VERSION = 1;
private static final String ROUTE_ACTIVITY_TAG = "route_activity";
private static final int ROUTE_ACTIVITY_CURRENT_VERSION = 1;

private WorkflowState workflowState = new WorkflowState(UUID.randomUUID(), new NoopStateListener());

private final WorkflowInternalState workflowInternalState = new WorkflowInternalState();
Expand Down Expand Up @@ -299,22 +264,12 @@ private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterIn

private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput, final StandardSyncOutput standardSyncOutput) {
workflowState.setSuccess(true);
final int attemptCreationVersion =
Workflow.getVersion(RENAME_ATTEMPT_ID_TO_NUMBER_TAG, Workflow.DEFAULT_VERSION, RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION);

if (attemptCreationVersion < RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION) {
runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobSuccess, new JobSuccessInput(
workflowInternalState.getJobId(),
workflowInternalState.getAttemptNumber(),
connectionUpdaterInput.getConnectionId(),
standardSyncOutput));
} else {
runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobSuccessWithAttemptNumber, new JobSuccessInputWithAttemptNumber(
workflowInternalState.getJobId(),
workflowInternalState.getAttemptNumber(),
connectionUpdaterInput.getConnectionId(),
standardSyncOutput));
}

runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobSuccessWithAttemptNumber, new JobSuccessInputWithAttemptNumber(
workflowInternalState.getJobId(),
workflowInternalState.getAttemptNumber(),
connectionUpdaterInput.getConnectionId(),
standardSyncOutput));

deleteResetJobStreams();

Expand All @@ -334,25 +289,15 @@ private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput,
final StandardSyncOutput standardSyncOutput,
final FailureCause failureCause,
final Set<FailureReason> failureReasonsOverride) {
final int attemptCreationVersion =
Workflow.getVersion(RENAME_ATTEMPT_ID_TO_NUMBER_TAG, Workflow.DEFAULT_VERSION, RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION);

final Set<FailureReason> failureReasons = failureReasonsOverride.isEmpty() ? workflowInternalState.getFailures() : failureReasonsOverride;
if (attemptCreationVersion < RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION) {
runMandatoryActivity(jobCreationAndStatusUpdateActivity::attemptFailure, new AttemptFailureInput(
workflowInternalState.getJobId(),
workflowInternalState.getAttemptNumber(),
connectionUpdaterInput.getConnectionId(),
standardSyncOutput,
FailureHelper.failureSummary(failureReasons, workflowInternalState.getPartialSuccess())));
} else {
runMandatoryActivity(jobCreationAndStatusUpdateActivity::attemptFailureWithAttemptNumber, new AttemptNumberFailureInput(
workflowInternalState.getJobId(),
workflowInternalState.getAttemptNumber(),
connectionUpdaterInput.getConnectionId(),
standardSyncOutput,
FailureHelper.failureSummary(failureReasons, workflowInternalState.getPartialSuccess())));
}

runMandatoryActivity(jobCreationAndStatusUpdateActivity::attemptFailureWithAttemptNumber, new AttemptNumberFailureInput(
workflowInternalState.getJobId(),
workflowInternalState.getAttemptNumber(),
connectionUpdaterInput.getConnectionId(),
standardSyncOutput,
FailureHelper.failureSummary(failureReasons, workflowInternalState.getPartialSuccess())));

final int maxAttempt = configFetchActivity.getMaxAttempt().getMaxAttempt();
final int attemptNumber = connectionUpdaterInput.getAttemptNumber();
Expand All @@ -371,17 +316,12 @@ private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput,
runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobFailure, new JobFailureInput(connectionUpdaterInput.getJobId(),
connectionUpdaterInput.getAttemptNumber(), connectionUpdaterInput.getConnectionId(), failureReason));

final int autoDisableConnectionVersion =
Workflow.getVersion("auto_disable_failing_connection", Workflow.DEFAULT_VERSION, AUTO_DISABLE_FAILING_CONNECTION_CHANGE_CURRENT_VERSION);

if (autoDisableConnectionVersion != Workflow.DEFAULT_VERSION) {
final AutoDisableConnectionActivityInput autoDisableConnectionActivityInput =
new AutoDisableConnectionActivityInput(connectionId, Instant.ofEpochMilli(Workflow.currentTimeMillis()));
final AutoDisableConnectionOutput output = runMandatoryActivityWithOutput(
autoDisableConnectionActivity::autoDisableFailingConnection, autoDisableConnectionActivityInput);
if (output.isDisabled()) {
log.info("Auto-disabled for constantly failing for Connection {}", connectionId);
}
final AutoDisableConnectionActivityInput autoDisableConnectionActivityInput =
new AutoDisableConnectionActivityInput(connectionId, Instant.ofEpochMilli(Workflow.currentTimeMillis()));
final AutoDisableConnectionOutput output = runMandatoryActivityWithOutput(
autoDisableConnectionActivity::autoDisableFailingConnection, autoDisableConnectionActivityInput);
if (output.isDisabled()) {
log.info("Auto-disabled for constantly failing for Connection {}", connectionId);
}

// Record the failure metric
Expand All @@ -392,14 +332,6 @@ private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput,
}

private ConnectorJobOutput getCheckResponse(final CheckConnectionInput checkInput) {
final int checkJobOutputVersion =
Workflow.getVersion(CHECK_JOB_OUTPUT_TAG, Workflow.DEFAULT_VERSION, CHECK_JOB_OUTPUT_TAG_CURRENT_VERSION);

if (checkJobOutputVersion < CHECK_JOB_OUTPUT_TAG_CURRENT_VERSION) {
final StandardCheckConnectionOutput checkOutput = runMandatoryActivityWithOutput(checkActivity::run, checkInput);
return new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION).withCheckConnection(checkOutput);
}

return runMandatoryActivityWithOutput(checkActivity::runWithJobOutput, checkInput);
}

Expand All @@ -412,26 +344,14 @@ private SyncCheckConnectionFailure checkConnections(final GenerateInputActivity.
final IntegrationLauncherConfig destinationLauncherConfig = jobInputs.getDestinationLauncherConfig();
final SyncCheckConnectionFailure checkFailure = new SyncCheckConnectionFailure(jobRunConfig);

final int attemptCreationVersion =
Workflow.getVersion(CHECK_BEFORE_SYNC_TAG, Workflow.DEFAULT_VERSION, CHECK_BEFORE_SYNC_CURRENT_VERSION);

if (attemptCreationVersion < CHECK_BEFORE_SYNC_CURRENT_VERSION) {
// return early if this instance of the workflow was created beforehand
return checkFailure;
}

final StandardCheckConnectionInput sourceConfiguration = new StandardCheckConnectionInput().withConnectionConfiguration(sourceConfig);
final CheckConnectionInput checkSourceInput = new CheckConnectionInput(jobRunConfig, sourceLauncherConfig, sourceConfiguration);

final int checkJobOutputVersion =
Workflow.getVersion(CHECK_PREVIOUS_JOB_OR_ATTEMPT_TAG, Workflow.DEFAULT_VERSION, CHECK_PREVIOUS_JOB_OR_ATTEMPT_TAG_CURRENT_VERSION);
boolean isLastJobOrAttemptFailure = true;

if (checkJobOutputVersion >= CHECK_PREVIOUS_JOB_OR_ATTEMPT_TAG_CURRENT_VERSION) {
final JobCheckFailureInput jobStateInput =
new JobCheckFailureInput(Long.parseLong(jobRunConfig.getJobId()), jobRunConfig.getAttemptId().intValue(), connectionId);
isLastJobOrAttemptFailure = runMandatoryActivityWithOutput(jobCreationAndStatusUpdateActivity::isLastJobOrAttemptFailure, jobStateInput);
}
final JobCheckFailureInput jobStateInput =
new JobCheckFailureInput(Long.parseLong(jobRunConfig.getJobId()), jobRunConfig.getAttemptId().intValue(), connectionId);
isLastJobOrAttemptFailure = runMandatoryActivityWithOutput(jobCreationAndStatusUpdateActivity::isLastJobOrAttemptFailure, jobStateInput);
if (isResetJob(sourceLauncherConfig) || checkFailure.isFailed() || !isLastJobOrAttemptFailure) {
// reset jobs don't need to connect to any external source, so check connection is unnecessary
log.info("SOURCE CHECK: Skipped");
Expand Down Expand Up @@ -693,14 +613,6 @@ private Duration getTimeToWait(final UUID connectionId) {
}

private void ensureCleanJobState(final ConnectionUpdaterInput connectionUpdaterInput) {
final int ensureCleanJobStateVersion =
Workflow.getVersion(ENSURE_CLEAN_JOB_STATE, Workflow.DEFAULT_VERSION, ENSURE_CLEAN_JOB_STATE_CURRENT_VERSION);

// For backwards compatibility and determinism, skip if workflow existed before this change
if (ensureCleanJobStateVersion < ENSURE_CLEAN_JOB_STATE_CURRENT_VERSION) {
return;
}

if (connectionUpdaterInput.getJobId() != null) {
log.info("This workflow is already attached to a job, so no need to clean job state.");
return;
Expand All @@ -710,13 +622,6 @@ private void ensureCleanJobState(final ConnectionUpdaterInput connectionUpdaterI
}

private void recordMetric(final RecordMetricInput recordMetricInput) {
final int recordMetricVersion =
Workflow.getVersion(RECORD_METRIC_TAG, Workflow.DEFAULT_VERSION, RECORD_METRIC_CURRENT_VERSION);

if (recordMetricVersion < RECORD_METRIC_CURRENT_VERSION) {
return;
}

runMandatoryActivity(recordMetricActivity::recordWorkflowCountMetric, recordMetricInput);
}

Expand Down Expand Up @@ -747,19 +652,6 @@ private Long getOrCreateJobId(final ConnectionUpdaterInput connectionUpdaterInpu
* @return The attempt number
*/
private Integer createAttempt(final long jobId) {
final int attemptCreationVersion =
Workflow.getVersion(RENAME_ATTEMPT_ID_TO_NUMBER_TAG, Workflow.DEFAULT_VERSION, RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION);

// Retrieve the attempt number but name it attempt id
if (attemptCreationVersion < RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION) {
final AttemptCreationOutput attemptCreationOutput =
runMandatoryActivityWithOutput(
jobCreationAndStatusUpdateActivity::createNewAttempt,
new AttemptCreationInput(
jobId));
return attemptCreationOutput.getAttemptId();
}

final AttemptNumberCreationOutput attemptNumberCreationOutput =
runMandatoryActivityWithOutput(
jobCreationAndStatusUpdateActivity::createNewAttemptNumber,
Expand All @@ -775,20 +667,6 @@ private Integer createAttempt(final long jobId) {
private GeneratedJobInput getJobInput() {
final Long jobId = workflowInternalState.getJobId();
final Integer attemptNumber = workflowInternalState.getAttemptNumber();
final int attemptCreationVersion =
Workflow.getVersion(RENAME_ATTEMPT_ID_TO_NUMBER_TAG, Workflow.DEFAULT_VERSION, RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION);

if (attemptCreationVersion < RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION) {
final SyncInput getSyncInputActivitySyncInput = new SyncInput(
attemptNumber,
jobId);

final GeneratedJobInput syncWorkflowInputs = runMandatoryActivityWithOutput(
getSyncInputActivity::getSyncWorkflowInput,
getSyncInputActivitySyncInput);

return syncWorkflowInputs;
}

final SyncInputWithAttemptNumber getSyncInputActivitySyncInput = new SyncInputWithAttemptNumber(
attemptNumber,
Expand All @@ -802,18 +680,6 @@ private GeneratedJobInput getJobInput() {
}

private String getSyncTaskQueue() {
final int taskQueueChangeVersion =
Workflow.getVersion("task_queue_change_from_connection_updater_to_sync", Workflow.DEFAULT_VERSION, TASK_QUEUE_CHANGE_CURRENT_VERSION);

if (taskQueueChangeVersion < TASK_QUEUE_CHANGE_CURRENT_VERSION) {
return TemporalJobType.CONNECTION_UPDATER.name();
}

final int routeActivityVersion = Workflow.getVersion(ROUTE_ACTIVITY_TAG, Workflow.DEFAULT_VERSION, ROUTE_ACTIVITY_CURRENT_VERSION);

if (routeActivityVersion < ROUTE_ACTIVITY_CURRENT_VERSION) {
return TemporalJobType.SYNC.name();
}

final RouteToSyncTaskQueueInput routeToSyncTaskQueueInput = new RouteToSyncTaskQueueInput(connectionId);
final RouteToSyncTaskQueueOutput routeToSyncTaskQueueOutput = runMandatoryActivityWithOutput(
Expand Down Expand Up @@ -910,46 +776,21 @@ private void reportCancelled(final UUID connectionId) {
final Integer attemptNumber = workflowInternalState.getAttemptNumber();
final Set<FailureReason> failures = workflowInternalState.getFailures();
final Boolean partialSuccess = workflowInternalState.getPartialSuccess();
final int attemptCreationVersion =
Workflow.getVersion(RENAME_ATTEMPT_ID_TO_NUMBER_TAG, Workflow.DEFAULT_VERSION, RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION);

if (attemptCreationVersion < RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION) {
runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobCancelled,
new JobCancelledInput(
jobId,
attemptNumber,
connectionId,
FailureHelper.failureSummaryForCancellation(jobId, attemptNumber, failures, partialSuccess)));
} else {
runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobCancelledWithAttemptNumber,
new JobCancelledInputWithAttemptNumber(
jobId,
attemptNumber,
connectionId,
FailureHelper.failureSummaryForCancellation(jobId, attemptNumber, failures, partialSuccess)));
}

runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobCancelledWithAttemptNumber,
new JobCancelledInputWithAttemptNumber(
jobId,
attemptNumber,
connectionId,
FailureHelper.failureSummaryForCancellation(jobId, attemptNumber, failures, partialSuccess)));
}

private void deleteResetJobStreams() {
final int deleteResetJobStreamsVersion =
Workflow.getVersion(DELETE_RESET_JOB_STREAMS_TAG, Workflow.DEFAULT_VERSION, DELETE_RESET_JOB_STREAMS_CURRENT_VERSION);

if (deleteResetJobStreamsVersion < DELETE_RESET_JOB_STREAMS_CURRENT_VERSION) {
return;
}

runMandatoryActivity(streamResetActivity::deleteStreamResetRecordsForJob,
new DeleteStreamResetRecordsForJobInput(connectionId, workflowInternalState.getJobId()));
}

private Duration getWorkflowRestartDelaySeconds() {
final int workflowConfigVersion =
Workflow.getVersion(WORKFLOW_CONFIG_TAG, Workflow.DEFAULT_VERSION, WORKFLOW_CONFIG_CURRENT_VERSION);

if (workflowConfigVersion < WORKFLOW_CONFIG_CURRENT_VERSION) {
return Duration.ofMinutes(10L);
}

return workflowConfigActivity.getWorkflowRestartDelaySeconds();
}

Expand Down
Loading