Skip to content

Commit

Permalink
Skip CHECKs steps if previous sync success (airbytehq#17999)
Browse files Browse the repository at this point in the history
* Disabled CHECKs for source and destination, when previous sync or previous attempt is failure

* added tests and added minor changes to the isLastJobOrAttemptFailure method

* updated remarks

* updated code style

* updated code style

* fixed remarks

* fixed remarks
  • Loading branch information
andriikorotkov authored and jhammarstedt committed Oct 31, 2022
1 parent b4635e8 commit d7808d1
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.EnsureCleanJobStateInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCancelledInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCancelledInputWithAttemptNumber;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCheckFailureInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCreationInput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobCreationOutput;
import io.airbyte.workers.temporal.scheduling.activities.JobCreationAndStatusUpdateActivity.JobFailureInput;
Expand Down Expand Up @@ -102,6 +103,9 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow
private static final String RENAME_ATTEMPT_ID_TO_NUMBER_TAG = "rename_attempt_id_to_number";
private static final int RENAME_ATTEMPT_ID_TO_NUMBER_CURRENT_VERSION = 1;

private static final String CHECK_PREVIOUS_JOB_OR_ATTEMPT_TAG = "check_previous_job_or_attempt";
private static final int CHECK_PREVIOUS_JOB_OR_ATTEMPT_TAG_CURRENT_VERSION = 1;

private static final String ENSURE_CLEAN_JOB_STATE = "ensure_clean_job_state";
private static final int ENSURE_CLEAN_JOB_STATE_CURRENT_VERSION = 1;

Expand Down Expand Up @@ -407,7 +411,16 @@ private SyncCheckConnectionFailure checkConnections(final GenerateInputActivity.
final StandardCheckConnectionInput sourceConfiguration = new StandardCheckConnectionInput().withConnectionConfiguration(sourceConfig);
final CheckConnectionInput checkSourceInput = new CheckConnectionInput(jobRunConfig, sourceLauncherConfig, sourceConfiguration);

if (isResetJob(sourceLauncherConfig) || checkFailure.isFailed()) {
final int checkJobOutputVersion =
Workflow.getVersion(CHECK_PREVIOUS_JOB_OR_ATTEMPT_TAG, Workflow.DEFAULT_VERSION, CHECK_PREVIOUS_JOB_OR_ATTEMPT_TAG_CURRENT_VERSION);
boolean isLastJobOrAttemptFailure = true;

if (checkJobOutputVersion >= CHECK_PREVIOUS_JOB_OR_ATTEMPT_TAG_CURRENT_VERSION) {
final JobCheckFailureInput jobStateInput =
new JobCheckFailureInput(Long.parseLong(jobRunConfig.getJobId()), jobRunConfig.getAttemptId().intValue(), connectionId);
isLastJobOrAttemptFailure = runMandatoryActivityWithOutput(jobCreationAndStatusUpdateActivity::isLastJobOrAttemptFailure, jobStateInput);
}
if (isResetJob(sourceLauncherConfig) || checkFailure.isFailed() || !isLastJobOrAttemptFailure) {
// reset jobs don't need to connect to any external source, so check connection is unnecessary
log.info("SOURCE CHECK: Skipped");
} else {
Expand All @@ -425,7 +438,7 @@ private SyncCheckConnectionFailure checkConnections(final GenerateInputActivity.
final StandardCheckConnectionInput destinationConfiguration = new StandardCheckConnectionInput().withConnectionConfiguration(destinationConfig);
final CheckConnectionInput checkDestinationInput = new CheckConnectionInput(jobRunConfig, destinationLauncherConfig, destinationConfiguration);

if (checkFailure.isFailed()) {
if (checkFailure.isFailed() || !isLastJobOrAttemptFailure) {
log.info("DESTINATION CHECK: Skipped");
} else {
log.info("DESTINATION CHECK: Starting");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,4 +242,18 @@ class EnsureCleanJobStateInput {
@ActivityMethod
void ensureCleanJobState(EnsureCleanJobStateInput input);

@Data
@NoArgsConstructor
@AllArgsConstructor
class JobCheckFailureInput {

private long jobId;
private int attemptId;
private UUID connectionId;

}

@ActivityMethod
boolean isLastJobOrAttemptFailure(JobCheckFailureInput input);

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.workers.temporal.scheduling.activities;

import static io.airbyte.config.JobConfig.ConfigType.SYNC;
import static io.airbyte.persistence.job.models.AttemptStatus.FAILED;
import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.ACTIVITY_TRACE_OPERATION_NAME;
import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.CONNECTION_ID_KEY;
import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.JOB_ID_KEY;
Expand All @@ -18,6 +20,7 @@
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.FailureReason;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobOutput;
import io.airbyte.config.JobSyncConfig;
import io.airbyte.config.NormalizationSummary;
Expand Down Expand Up @@ -56,9 +59,13 @@
import jakarta.inject.Singleton;
import java.io.IOException;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -372,6 +379,55 @@ public void ensureCleanJobState(final EnsureCleanJobStateInput input) {
failNonTerminalJobs(input.getConnectionId());
}

@Override
public boolean isLastJobOrAttemptFailure(JobCheckFailureInput input) {
final int limit = 2;
boolean lastAttemptCheck = false;
boolean lastJobCheck = false;

Set<JobConfig.ConfigType> configTypes = new HashSet<>();
configTypes.add(SYNC);

try {
List<Job> jobList = jobPersistence.listJobsIncludingId(configTypes, input.getConnectionId().toString(), input.getJobId(), limit);
Optional<Job> optionalActiveJob = jobList.stream().filter(job -> job.getId() == input.getJobId()).findFirst();
if (optionalActiveJob.isPresent()) {
lastAttemptCheck = checkActiveJobPreviousAttempt(optionalActiveJob.get(), input.getAttemptId());
}

OptionalLong previousJobId = getPreviousJobId(input.getJobId(), jobList.stream().map(Job::getId).toList());
if (previousJobId.isPresent()) {
Optional<Job> optionalPreviousJob = jobList.stream().filter(job -> job.getId() == previousJobId.getAsLong()).findFirst();
if (optionalPreviousJob.isPresent()) {
lastJobCheck = optionalPreviousJob.get().getStatus().equals(io.airbyte.persistence.job.models.JobStatus.FAILED);
}
}

return lastJobCheck || lastAttemptCheck;
} catch (final IOException e) {
throw new RetryableException(e);
}
}

private OptionalLong getPreviousJobId(Long activeJobId, List<Long> jobIdsList) {
return jobIdsList.stream()
.filter(jobId -> !Objects.equals(jobId, activeJobId))
.mapToLong(jobId -> jobId).max();
}

private boolean checkActiveJobPreviousAttempt(Job activeJob, int attemptId) {
final int minAttemptSize = 1;
boolean result = false;

if (activeJob.getAttempts().size() > minAttemptSize) {
Optional<Attempt> optionalAttempt = activeJob.getAttempts().stream()
.filter(attempt -> attempt.getId() == (attemptId - 1)).findFirst();
result = optionalAttempt.isPresent() && optionalAttempt.get().getStatus().equals(FAILED);
}

return result;
}

private void failNonTerminalJobs(final UUID connectionId) {
try {
final List<Job> jobs = jobPersistence.listJobsForConnectionWithStatuses(connectionId, Job.REPLICATION_TYPES,
Expand Down
Loading

0 comments on commit d7808d1

Please sign in to comment.