Skip to content

Commit

Permalink
Revert "Revert "Remove the attemptId notion in the connectionManagerW…
Browse files Browse the repository at this point in the history
…orkflow (#10780)" (#11057)" (#11073)

This reverts commit 892dc7e.
  • Loading branch information
benmoriceau authored Mar 11, 2022
1 parent 7cce8c7 commit e27bb74
Show file tree
Hide file tree
Showing 12 changed files with 1,304 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ public interface JobPersistence {
//

/**
* Create a new attempt for a job. Throws {@link IllegalStateException} if the job is already in a
* terminal state.
* Create a new attempt for a job and return its attempt number. Throws
* {@link IllegalStateException} if the job is already in a terminal state.
*
* @param jobId job for which an attempt will be created
* @param logPath path where logs should be written for the attempt
* @return id of the attempt
* @return The attempt number of the created attempt (see {@link DefaultJobPersistence})
* @throws IOException exception due to interaction with persistence
*/
int createAttempt(long jobId, Path logPath) throws IOException;
Expand Down
2 changes: 1 addition & 1 deletion airbyte-workers/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ dependencies {
implementation project(':airbyte-scheduler:models')

testImplementation 'io.temporal:temporal-testing:1.8.1'
testImplementation 'io.temporal:temporal-testing-junit5:1.5.0' // versioned separately from rest of temporal
testImplementation 'com.jayway.jsonpath:json-path:2.7.0'
testImplementation "org.flywaydb:flyway-core:7.14.0"
testImplementation 'org.mockito:mockito-inline:4.0.0'
testImplementation 'org.postgresql:postgresql:42.2.18'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,20 @@
import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivity;
import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivity.GeneratedJobInput;
import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivity.SyncInput;
import io.airbyte.workers.temporal.scheduling.activities.GenerateInputActivity.SyncInputWithAttemptNumber;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptCreationInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptCreationOutput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptFailureInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptNumberCreationOutput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.AttemptNumberFailureInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCancelledInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCancelledInputWithAttemptNumber;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCreationInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCreationOutput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobFailureInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobSuccessInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobSuccessInputWithAttemptNumber;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.ReportJobStartInput;
import io.airbyte.workers.temporal.scheduling.shared.ActivityConfiguration;
import io.airbyte.workers.temporal.scheduling.state.WorkflowInternalState;
Expand Down Expand Up @@ -56,6 +61,9 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow

private static final int TASK_QUEUE_CHANGE_CURRENT_VERSION = 1;

private static final String RENAME_ATTEMPT_ID_TO_NUMBER_TAG = "rename_attempt_id_to_number";
private static final int RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION = 1;

private WorkflowState workflowState = new WorkflowState(UUID.randomUUID(), new NoopStateListener());

private final WorkflowInternalState workflowInternalState = new WorkflowInternalState();
Expand Down Expand Up @@ -141,7 +149,7 @@ private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterIn

workflowInternalState.setJobId(getOrCreateJobId(connectionUpdaterInput));

workflowInternalState.setAttemptId(createAttemptId(workflowInternalState.getJobId()));
workflowInternalState.setAttemptNumber(createAttempt(workflowInternalState.getJobId()));

final GeneratedJobInput jobInputs = getJobInput();

Expand Down Expand Up @@ -177,13 +185,13 @@ private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterIn
af.getActivityType(),
af.getCause(),
workflowInternalState.getJobId(),
workflowInternalState.getAttemptId()));
workflowInternalState.getAttemptNumber()));
reportFailure(connectionUpdaterInput, standardSyncOutput);
prepareForNextRunAndContinueAsNew(connectionUpdaterInput);
} else {
workflowInternalState.getFailures().add(
FailureHelper.unknownOriginFailure(childWorkflowFailure.getCause(), workflowInternalState.getJobId(),
workflowInternalState.getAttemptId()));
workflowInternalState.getAttemptNumber()));
reportFailure(connectionUpdaterInput, standardSyncOutput);
prepareForNextRunAndContinueAsNew(connectionUpdaterInput);
}
Expand All @@ -193,20 +201,41 @@ private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterIn

private void reportSuccess(final ConnectionUpdaterInput connectionUpdaterInput, final StandardSyncOutput standardSyncOutput) {
workflowState.setSuccess(true);
runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobSuccess, new JobSuccessInput(
workflowInternalState.getJobId(),
workflowInternalState.getAttemptId(),
standardSyncOutput));
final int attemptCreationVersion =
Workflow.getVersion(RENAME_ATTEMPT_ID_TO_NUMBER_TAG, Workflow.DEFAULT_VERSION, RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION);

if (attemptCreationVersion < RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION) {
runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobSuccess, new JobSuccessInput(
workflowInternalState.getJobId(),
workflowInternalState.getAttemptNumber(),
standardSyncOutput));
} else {
runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobSuccessWithAttemptNumber, new JobSuccessInputWithAttemptNumber(
workflowInternalState.getJobId(),
workflowInternalState.getAttemptNumber(),
standardSyncOutput));
}

resetNewConnectionInput(connectionUpdaterInput);
}

private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput, final StandardSyncOutput standardSyncOutput) {
runMandatoryActivity(jobCreationAndStatusUpdateActivity::attemptFailure, new AttemptFailureInput(
workflowInternalState.getJobId(),
workflowInternalState.getAttemptId(),
standardSyncOutput,
FailureHelper.failureSummary(workflowInternalState.getFailures(), workflowInternalState.getPartialSuccess())));
final int attemptCreationVersion =
Workflow.getVersion(RENAME_ATTEMPT_ID_TO_NUMBER_TAG, Workflow.DEFAULT_VERSION, RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION);

if (attemptCreationVersion < RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION) {
runMandatoryActivity(jobCreationAndStatusUpdateActivity::attemptFailure, new AttemptFailureInput(
workflowInternalState.getJobId(),
workflowInternalState.getAttemptNumber(),
standardSyncOutput,
FailureHelper.failureSummary(workflowInternalState.getFailures(), workflowInternalState.getPartialSuccess())));
} else {
runMandatoryActivity(jobCreationAndStatusUpdateActivity::attemptFailureWithAttemptNumber, new AttemptNumberFailureInput(
workflowInternalState.getJobId(),
workflowInternalState.getAttemptNumber(),
standardSyncOutput,
FailureHelper.failureSummary(workflowInternalState.getFailures(), workflowInternalState.getPartialSuccess())));
}

final int maxAttempt = configFetchActivity.getMaxAttempt().getMaxAttempt();
final int attemptNumber = connectionUpdaterInput.getAttemptNumber();
Expand Down Expand Up @@ -291,20 +320,20 @@ public WorkflowState getState() {
@Override
public JobInformation getJobInformation() {
final Long jobId = workflowInternalState.getJobId();
final Integer attemptId = workflowInternalState.getAttemptId();
final Integer attemptNumber = workflowInternalState.getAttemptNumber();
return new JobInformation(
jobId == null ? NON_RUNNING_JOB_ID : jobId,
attemptId == null ? NON_RUNNING_ATTEMPT_ID : attemptId);
attemptNumber == null ? NON_RUNNING_ATTEMPT_ID : attemptNumber);
}

@Override
public QuarantinedInformation getQuarantinedInformation() {
final Long jobId = workflowInternalState.getJobId();
final Integer attemptId = workflowInternalState.getAttemptId();
final Integer attemptNumber = workflowInternalState.getAttemptNumber();
return new QuarantinedInformation(
connectionId,
jobId == null ? NON_RUNNING_JOB_ID : jobId,
attemptId == null ? NON_RUNNING_ATTEMPT_ID : attemptId,
attemptNumber == null ? NON_RUNNING_ATTEMPT_ID : attemptNumber,
workflowState.isQuarantined());
}

Expand Down Expand Up @@ -404,15 +433,31 @@ private Long getOrCreateJobId(final ConnectionUpdaterInput connectionUpdaterInpu

/**
* Create a new attempt for a given jobId
*
* @param jobId - the jobId associated with the new attempt
*
* @return The attempt number
*/
private Integer createAttemptId(final long jobId) {
final AttemptCreationOutput attemptCreationOutput =
private Integer createAttempt(final long jobId) {
final int attemptCreationVersion =
Workflow.getVersion(RENAME_ATTEMPT_ID_TO_NUMBER_TAG, Workflow.DEFAULT_VERSION, RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION);

// Retrieve the attempt number but name it attempt id
if (attemptCreationVersion < RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION) {
final AttemptCreationOutput attemptCreationOutput =
runMandatoryActivityWithOutput(
jobCreationAndStatusUpdateActivity::createNewAttempt,
new AttemptCreationInput(
jobId));
return attemptCreationOutput.getAttemptId();
}

final AttemptNumberCreationOutput attemptNumberCreationOutput =
runMandatoryActivityWithOutput(
jobCreationAndStatusUpdateActivity::createNewAttempt,
jobCreationAndStatusUpdateActivity::createNewAttemptNumber,
new AttemptCreationInput(
jobId));

return attemptCreationOutput.getAttemptId();
return attemptNumberCreationOutput.getAttemptNumber();
}

/**
Expand All @@ -421,14 +466,30 @@ private Integer createAttemptId(final long jobId) {
*/
private GeneratedJobInput getJobInput() {
final Long jobId = workflowInternalState.getJobId();
final Integer attemptId = workflowInternalState.getAttemptId();
final SyncInput getSyncInputActivitySyncInput = new SyncInput(
attemptId,
final Integer attemptNumber = workflowInternalState.getAttemptNumber();
final int attemptCreationVersion =
Workflow.getVersion(RENAME_ATTEMPT_ID_TO_NUMBER_TAG, Workflow.DEFAULT_VERSION, RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION);

if (attemptCreationVersion < RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION) {
final SyncInput getSyncInputActivitySyncInput = new SyncInput(
attemptNumber,
jobId,
workflowState.isResetConnection());

final GeneratedJobInput syncWorkflowInputs = runMandatoryActivityWithOutput(
getSyncInputActivity::getSyncWorkflowInput,
getSyncInputActivitySyncInput);

return syncWorkflowInputs;
}

final SyncInputWithAttemptNumber getSyncInputActivitySyncInput = new SyncInputWithAttemptNumber(
attemptNumber,
jobId,
workflowState.isResetConnection());

final GeneratedJobInput syncWorkflowInputs = runMandatoryActivityWithOutput(
getSyncInputActivity::getSyncWorkflowInput,
getSyncInputActivity::getSyncWorkflowInputWithAttemptNumber,
getSyncInputActivitySyncInput);

return syncWorkflowInputs;
Expand Down Expand Up @@ -515,14 +576,25 @@ private void deleteConnectionBeforeTerminatingTheWorkflow() {
private void reportCancelledAndContinueWith(final boolean isReset, final ConnectionUpdaterInput connectionUpdaterInput) {
workflowState.setContinueAsReset(isReset);
final Long jobId = workflowInternalState.getJobId();
final Integer attemptId = workflowInternalState.getAttemptId();
final Integer attemptNumber = workflowInternalState.getAttemptNumber();
final Set<FailureReason> failures = workflowInternalState.getFailures();
final Boolean partialSuccess = workflowInternalState.getPartialSuccess();
runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobCancelled,
new JobCancelledInput(
jobId,
attemptId,
FailureHelper.failureSummaryForCancellation(jobId, attemptId, failures, partialSuccess)));
final int attemptCreationVersion =
Workflow.getVersion(RENAME_ATTEMPT_ID_TO_NUMBER_TAG, Workflow.DEFAULT_VERSION, RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION);

if (attemptCreationVersion < RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION) {
runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobCancelled,
new JobCancelledInput(
jobId,
attemptNumber,
FailureHelper.failureSummaryForCancellation(jobId, attemptNumber, failures, partialSuccess)));
} else {
runMandatoryActivity(jobCreationAndStatusUpdateActivity::jobCancelledWithAttemptNumber,
new JobCancelledInputWithAttemptNumber(
jobId,
attemptNumber,
FailureHelper.failureSummaryForCancellation(jobId, attemptNumber, failures, partialSuccess)));
}
resetNewConnectionInput(connectionUpdaterInput);
prepareForNextRunAndContinueAsNew(connectionUpdaterInput);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ class SyncInput {

}

@Data
@NoArgsConstructor
@AllArgsConstructor
class SyncInputWithAttemptNumber {

private int attemptNumber;
private long jobId;
private boolean reset;

}

@Data
@NoArgsConstructor
@AllArgsConstructor
Expand All @@ -45,4 +56,10 @@ class GeneratedJobInput {
@ActivityMethod
GeneratedJobInput getSyncWorkflowInput(SyncInput input);

/**
* This generate the input needed by the child sync workflow
*/
@ActivityMethod
GeneratedJobInput getSyncWorkflowInputWithAttemptNumber(SyncInputWithAttemptNumber input);

}
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,12 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) {
}
}

@Override
public GeneratedJobInput getSyncWorkflowInputWithAttemptNumber(final SyncInputWithAttemptNumber input) {
return getSyncWorkflowInput(new SyncInput(
input.getAttemptNumber(),
input.getJobId(),
input.isReset()));
}

}
Loading

0 comments on commit e27bb74

Please sign in to comment.