diff --git a/airbyte-api/src/main/openapi/config.yaml b/airbyte-api/src/main/openapi/config.yaml index f7b574f1e96b..5681bbdbcc70 100644 --- a/airbyte-api/src/main/openapi/config.yaml +++ b/airbyte-api/src/main/openapi/config.yaml @@ -2171,6 +2171,26 @@ paths: $ref: "#/components/responses/NotFoundResponse" "422": $ref: "#/components/responses/InvalidInputResponse" + /v1/jobs/get_normalization_status: + post: + tags: + - jobs + - internal + summary: Get normalization status to determine if we can bypass normalization phase + operationId: getAttemptNormalizationStatusesForJob + requestBody: + content: + application/json: + schema: + $ref: "#/components/schemas/JobIdRequestBody" + responses: + "200": + description: Successful operation + content: + application/json: + schema: + $ref: "#/components/schemas/AttemptNormalizationStatusReadList" + /v1/health: get: tags: @@ -2244,6 +2264,7 @@ paths: application/json: schema: $ref: "#/components/schemas/InternalOperationResult" + components: securitySchemes: bearerAuth: @@ -4840,6 +4861,26 @@ components: properties: succeeded: type: boolean + AttemptNormalizationStatusReadList: + type: object + properties: + attemptNormalizationStatuses: + type: array + items: + $ref: "#/components/schemas/AttemptNormalizationStatusRead" + AttemptNormalizationStatusRead: + type: object + properties: + attemptNumber: + $ref: "#/components/schemas/AttemptNumber" + hasRecordsCommitted: + type: boolean + recordsCommitted: + type: integer + format: int64 + hasNormalizationFailed: + type: boolean + InvalidInputProperty: type: object required: diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/AttemptNormalizationStatus.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/AttemptNormalizationStatus.java index 9575bb8e9968..4c106dd654ab 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/AttemptNormalizationStatus.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/AttemptNormalizationStatus.java @@ -6,4 +6,4 @@ import java.util.Optional; -public record AttemptNormalizationStatus(long attemptNumber, Optional recordsCommitted, boolean normalizationFailed) {} +public record AttemptNormalizationStatus(int attemptNumber, Optional recordsCommitted, boolean normalizationFailed) {} diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java b/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java index 81fc1daff33b..cf2b731a246e 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java +++ b/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java @@ -5,6 +5,7 @@ package io.airbyte.server.apis; import io.airbyte.analytics.TrackingClient; +import io.airbyte.api.model.generated.AttemptNormalizationStatusReadList; import io.airbyte.api.model.generated.CheckConnectionRead; import io.airbyte.api.model.generated.CheckOperationRead; import io.airbyte.api.model.generated.CompleteDestinationOAuthRequest; @@ -789,6 +790,11 @@ public JobDebugInfoRead getJobDebugInfo(final JobIdRequestBody jobIdRequestBody) return execute(() -> jobHistoryHandler.getJobDebugInfo(jobIdRequestBody)); } + @Override + public AttemptNormalizationStatusReadList getAttemptNormalizationStatusesForJob(final JobIdRequestBody jobIdRequestBody) { + return execute(() -> jobHistoryHandler.getAttemptNormalizationStatuses(jobIdRequestBody)); + } + @Override public File getLogs(final LogsRequestBody logsRequestBody) { return execute(() -> logsHandler.getLogs(workspaceRoot, workerEnvironment, logConfigs, logsRequestBody)); diff --git a/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java b/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java index 614344230532..52c28f3640f1 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java +++ b/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java @@ -9,6 +9,7 @@ import io.airbyte.api.model.generated.AttemptFailureSummary; import io.airbyte.api.model.generated.AttemptFailureType; import io.airbyte.api.model.generated.AttemptInfoRead; +import io.airbyte.api.model.generated.AttemptNormalizationStatusRead; import io.airbyte.api.model.generated.AttemptRead; import io.airbyte.api.model.generated.AttemptStats; import io.airbyte.api.model.generated.AttemptStatus; @@ -38,6 +39,7 @@ import io.airbyte.config.helpers.LogClientSingleton; import io.airbyte.config.helpers.LogConfigs; import io.airbyte.persistence.job.models.Attempt; +import io.airbyte.persistence.job.models.AttemptNormalizationStatus; import io.airbyte.persistence.job.models.Job; import io.airbyte.server.scheduler.SynchronousJobMetadata; import io.airbyte.server.scheduler.SynchronousResponse; @@ -240,4 +242,13 @@ public SynchronousJobRead getSynchronousJobRead(final SynchronousJobMetadata met .logs(getLogRead(metadata.getLogPath())); } + public static AttemptNormalizationStatusRead convertAttemptNormalizationStatus( + AttemptNormalizationStatus databaseStatus) { + return new AttemptNormalizationStatusRead() + .attemptNumber(databaseStatus.attemptNumber()) + .hasRecordsCommitted(!databaseStatus.recordsCommitted().isEmpty()) + .recordsCommitted(databaseStatus.recordsCommitted().orElse(0L)) + .hasNormalizationFailed(databaseStatus.normalizationFailed()); + } + } diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java index bb4f7bbb551f..e25bee37f04b 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java @@ -5,6 +5,7 @@ package io.airbyte.server.handlers; import com.google.common.base.Preconditions; +import io.airbyte.api.model.generated.AttemptNormalizationStatusReadList; import io.airbyte.api.model.generated.ConnectionRead; import io.airbyte.api.model.generated.DestinationDefinitionIdRequestBody; import io.airbyte.api.model.generated.DestinationDefinitionRead; @@ -146,6 +147,12 @@ public List getLatestSyncJobsForConnections(final List connection .collect(Collectors.toList()); } + public AttemptNormalizationStatusReadList getAttemptNormalizationStatuses(final JobIdRequestBody jobIdRequestBody) throws IOException { + return new AttemptNormalizationStatusReadList() + .attemptNormalizationStatuses(jobPersistence.getAttemptNormalizationStatusesForJob(jobIdRequestBody.getId()).stream() + .map(JobConverter::convertAttemptNormalizationStatus).collect(Collectors.toList())); + } + public List getRunningSyncJobForConnections(final List connectionIds) throws IOException { return jobPersistence.getRunningSyncJobForConnections(connectionIds).stream() .map(JobConverter::getJobRead) diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java index 3d4f6e942401..d53044e95596 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java @@ -20,6 +20,7 @@ import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.persistence.job.JobPersistence; import io.airbyte.persistence.job.models.Attempt; +import io.airbyte.persistence.job.models.AttemptNormalizationStatus; import io.airbyte.persistence.job.models.AttemptStatus; import io.airbyte.persistence.job.models.Job; import io.airbyte.persistence.job.models.JobStatus; @@ -376,4 +377,19 @@ void testEnumConversion() { assertTrue(Enums.isCompatible(JobConfig.ConfigType.class, JobConfigType.class)); } + @Test + @DisplayName("Should return attempt normalization info for the job") + void testGetAttemptNormalizationStatuses() throws IOException { + + AttemptNormalizationStatus databaseReadResult = new AttemptNormalizationStatus(1, Optional.of(10L), /* hasNormalizationFailed= */ false); + + when(jobPersistence.getAttemptNormalizationStatusesForJob(JOB_ID)).thenReturn(List.of(databaseReadResult)); + + AttemptNormalizationStatusReadList expectedStatus = new AttemptNormalizationStatusReadList().attemptNormalizationStatuses( + List.of(new AttemptNormalizationStatusRead().attemptNumber(1).hasRecordsCommitted(true).hasNormalizationFailed(false).recordsCommitted(10L))); + + assertEquals(expectedStatus, jobHistoryHandler.getAttemptNormalizationStatuses(new JobIdRequestBody().id(JOB_ID))); + + } + } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivity.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivity.java index f4948483c187..9601c0cdf0d6 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivity.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivity.java @@ -6,13 +6,12 @@ import io.temporal.activity.ActivityInterface; import io.temporal.activity.ActivityMethod; -import java.io.IOException; import java.util.Optional; @ActivityInterface public interface NormalizationSummaryCheckActivity { @ActivityMethod - boolean shouldRunNormalization(Long jobId, Long attemptId, Optional numCommittedRecords) throws IOException; + boolean shouldRunNormalization(Long jobId, Long attemptId, Optional numCommittedRecords); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivityImpl.java index 8d508538f214..ad6a37961b06 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationSummaryCheckActivityImpl.java @@ -8,13 +8,15 @@ import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; import datadog.trace.api.Trace; +import io.airbyte.api.client.AirbyteApiClient; +import io.airbyte.api.client.invoker.generated.ApiException; +import io.airbyte.api.client.model.generated.AttemptNormalizationStatusRead; +import io.airbyte.api.client.model.generated.AttemptNormalizationStatusReadList; +import io.airbyte.api.client.model.generated.JobIdRequestBody; import io.airbyte.metrics.lib.ApmTraceUtils; -import io.airbyte.persistence.job.JobPersistence; -import io.airbyte.persistence.job.models.AttemptNormalizationStatus; +import io.temporal.activity.Activity; import jakarta.inject.Singleton; -import java.io.IOException; import java.util.Comparator; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; @@ -25,42 +27,44 @@ @Singleton public class NormalizationSummaryCheckActivityImpl implements NormalizationSummaryCheckActivity { - private final Optional jobPersistence; + private final AirbyteApiClient airbyteApiClient; - public NormalizationSummaryCheckActivityImpl(final Optional jobPersistence) { - this.jobPersistence = jobPersistence; + public NormalizationSummaryCheckActivityImpl(AirbyteApiClient airbyteApiClient) { + this.airbyteApiClient = airbyteApiClient; } @Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME) @Override @SuppressWarnings("PMD.AvoidLiteralsInIfCondition") - public boolean shouldRunNormalization(final Long jobId, final Long attemptNumber, final Optional numCommittedRecords) throws IOException { + public boolean shouldRunNormalization(final Long jobId, final Long attemptNumber, final Optional numCommittedRecords) { ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, jobId)); - // if job persistence is unavailable, default to running normalization - if (jobPersistence.isEmpty()) { - return true; - } - // if the count of committed records for this attempt is > 0 OR if it is null, // then we should run normalization if (numCommittedRecords.isEmpty() || numCommittedRecords.get() > 0) { return true; } - final List attemptNormalizationStatuses = jobPersistence.get().getAttemptNormalizationStatusesForJob(jobId); + final AttemptNormalizationStatusReadList AttemptNormalizationStatusReadList; + try { + AttemptNormalizationStatusReadList = airbyteApiClient.getJobsApi().getAttemptNormalizationStatusesForJob(new JobIdRequestBody().id(jobId)); + } catch (ApiException e) { + throw Activity.wrap(e); + } final AtomicLong totalRecordsCommitted = new AtomicLong(0L); final AtomicBoolean shouldReturnTrue = new AtomicBoolean(false); - attemptNormalizationStatuses.stream().sorted(Comparator.comparing(AttemptNormalizationStatus::attemptNumber).reversed()).toList() + AttemptNormalizationStatusReadList.getAttemptNormalizationStatuses().stream().sorted(Comparator.comparing( + AttemptNormalizationStatusRead::getAttemptNumber).reversed()).toList() .forEach(n -> { - if (n.attemptNumber() == attemptNumber) { + // Have to cast it because attemptNumber is read from JobRunConfig. + if (n.getAttemptNumber().intValue() == attemptNumber) { return; } // if normalization succeeded from a previous attempt succeeded, // we can stop looking for previous attempts - if (!n.normalizationFailed()) { + if (!n.getHasNormalizationFailed()) { return; } @@ -68,11 +72,11 @@ public boolean shouldRunNormalization(final Long jobId, final Long attemptNumber // committed number // if there is no data recorded for the number of committed records, we should assume that there // were committed records and run normalization - if (n.recordsCommitted().isEmpty()) { + if (!n.getHasRecordsCommitted()) { shouldReturnTrue.set(true); return; - } else if (n.recordsCommitted().get() != 0L) { - totalRecordsCommitted.addAndGet(n.recordsCommitted().get()); + } else if (n.getRecordsCommitted().longValue() != 0L) { + totalRecordsCommitted.addAndGet(n.getRecordsCommitted()); } }); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java index 8270db6c7493..d75dec375dc1 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java @@ -27,7 +27,6 @@ import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.workers.temporal.annotations.TemporalActivityStub; import io.temporal.workflow.Workflow; -import java.io.IOException; import java.util.Map; import java.util.Optional; import java.util.UUID; @@ -92,7 +91,7 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig, try { shouldRun = normalizationSummaryCheckActivity.shouldRunNormalization(Long.valueOf(jobRunConfig.getJobId()), jobRunConfig.getAttemptId(), Optional.ofNullable(syncOutput.getStandardSyncSummary().getTotalStats().getRecordsCommitted())); - } catch (final IOException e) { + } catch (final Exception e) { shouldRun = true; } if (!shouldRun) { diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/NormalizationSummaryCheckActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/NormalizationSummaryCheckActivityTest.java index 4ddd6039db9a..ba508937d5e1 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/NormalizationSummaryCheckActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/NormalizationSummaryCheckActivityTest.java @@ -5,11 +5,15 @@ package io.airbyte.workers.temporal.scheduling.activities; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; -import io.airbyte.persistence.job.JobPersistence; -import io.airbyte.persistence.job.models.AttemptNormalizationStatus; +import io.airbyte.api.client.AirbyteApiClient; +import io.airbyte.api.client.generated.JobsApi; +import io.airbyte.api.client.invoker.generated.ApiException; +import io.airbyte.api.client.model.generated.AttemptNormalizationStatusRead; +import io.airbyte.api.client.model.generated.AttemptNormalizationStatusReadList; +import io.airbyte.api.client.model.generated.JobIdRequestBody; import io.airbyte.workers.temporal.sync.NormalizationSummaryCheckActivityImpl; -import java.io.IOException; import java.util.List; import java.util.Optional; import lombok.extern.slf4j.Slf4j; @@ -17,7 +21,6 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; @Slf4j @@ -26,47 +29,62 @@ class NormalizationSummaryCheckActivityTest { private static final Long JOB_ID = 10L; static private NormalizationSummaryCheckActivityImpl normalizationSummaryCheckActivity; - static private JobPersistence mJobPersistence; + static private AirbyteApiClient airbyteApiClient; + static private JobsApi jobsApi; @BeforeAll static void setUp() { - mJobPersistence = mock(JobPersistence.class); - normalizationSummaryCheckActivity = new NormalizationSummaryCheckActivityImpl(Optional.of(mJobPersistence)); + airbyteApiClient = mock(AirbyteApiClient.class); + jobsApi = mock(JobsApi.class); + when(airbyteApiClient.getJobsApi()).thenReturn(jobsApi); + normalizationSummaryCheckActivity = new NormalizationSummaryCheckActivityImpl(airbyteApiClient); } @Test - void testShouldRunNormalizationRecordsCommittedOnFirstAttemptButNotCurrentAttempt() throws IOException { + void testShouldRunNormalizationRecordsCommittedOnFirstAttemptButNotCurrentAttempt() throws ApiException { // Attempt 1 committed records, but normalization failed // Attempt 2 did not commit records, normalization failed (or did not run) - final AttemptNormalizationStatus attempt1 = new AttemptNormalizationStatus(1, Optional.of(10L), true); - final AttemptNormalizationStatus attempt2 = new AttemptNormalizationStatus(2, Optional.of(0L), true); - Mockito.when(mJobPersistence.getAttemptNormalizationStatusesForJob(JOB_ID)).thenReturn(List.of(attempt1, attempt2)); + final AttemptNormalizationStatusRead attempt1 = + new AttemptNormalizationStatusRead().attemptNumber(1).hasRecordsCommitted(true).recordsCommitted(10L).hasNormalizationFailed(true); + final AttemptNormalizationStatusRead attempt2 = + new AttemptNormalizationStatusRead().attemptNumber(2).hasRecordsCommitted(true).recordsCommitted(0L).hasNormalizationFailed(true); + + when(jobsApi.getAttemptNormalizationStatusesForJob(new JobIdRequestBody().id(JOB_ID))) + .thenReturn(new AttemptNormalizationStatusReadList().attemptNormalizationStatuses(List.of(attempt1, attempt2))); Assertions.assertThat(true).isEqualTo(normalizationSummaryCheckActivity.shouldRunNormalization(JOB_ID, 3L, Optional.of(0L))); } @Test - void testShouldRunNormalizationRecordsCommittedOnCurrentAttempt() throws IOException { + void testShouldRunNormalizationRecordsCommittedOnCurrentAttempt() throws ApiException { Assertions.assertThat(true).isEqualTo(normalizationSummaryCheckActivity.shouldRunNormalization(JOB_ID, 3L, Optional.of(30L))); } @Test - void testShouldRunNormalizationNoRecordsCommittedOnCurrentAttemptOrPreviousAttempts() throws IOException { + void testShouldRunNormalizationNoRecordsCommittedOnCurrentAttemptOrPreviousAttempts() throws ApiException { // No attempts committed any records // Normalization did not run on any attempts - final AttemptNormalizationStatus attempt1 = new AttemptNormalizationStatus(1, Optional.of(0L), true); - final AttemptNormalizationStatus attempt2 = new AttemptNormalizationStatus(2, Optional.of(0L), true); - Mockito.when(mJobPersistence.getAttemptNormalizationStatusesForJob(JOB_ID)).thenReturn(List.of(attempt1, attempt2)); + final AttemptNormalizationStatusRead attempt1 = + new AttemptNormalizationStatusRead().attemptNumber(1).hasRecordsCommitted(true).recordsCommitted(0L).hasNormalizationFailed(true); + final AttemptNormalizationStatusRead attempt2 = + new AttemptNormalizationStatusRead().attemptNumber(2).hasRecordsCommitted(true).recordsCommitted(0L).hasNormalizationFailed(true); + + when(jobsApi.getAttemptNormalizationStatusesForJob(new JobIdRequestBody().id(JOB_ID))) + .thenReturn(new AttemptNormalizationStatusReadList().attemptNormalizationStatuses(List.of(attempt1, attempt2))); Assertions.assertThat(false).isEqualTo(normalizationSummaryCheckActivity.shouldRunNormalization(JOB_ID, 3L, Optional.of(0L))); } @Test - void testShouldRunNormalizationNoRecordsCommittedOnCurrentAttemptPreviousAttemptsSucceeded() throws IOException { + void testShouldRunNormalizationNoRecordsCommittedOnCurrentAttemptPreviousAttemptsSucceeded() throws ApiException { // Records committed on first two attempts and normalization succeeded // No records committed on current attempt and normalization has not yet run - final AttemptNormalizationStatus attempt1 = new AttemptNormalizationStatus(1, Optional.of(10L), false); - final AttemptNormalizationStatus attempt2 = new AttemptNormalizationStatus(2, Optional.of(20L), false); - Mockito.when(mJobPersistence.getAttemptNormalizationStatusesForJob(JOB_ID)).thenReturn(List.of(attempt1, attempt2)); + final AttemptNormalizationStatusRead attempt1 = + new AttemptNormalizationStatusRead().attemptNumber(1).hasRecordsCommitted(true).recordsCommitted(10L).hasNormalizationFailed(false); + final AttemptNormalizationStatusRead attempt2 = + new AttemptNormalizationStatusRead().attemptNumber(2).hasRecordsCommitted(true).recordsCommitted(20L).hasNormalizationFailed(false); + + when(jobsApi.getAttemptNormalizationStatusesForJob(new JobIdRequestBody().id(JOB_ID))) + .thenReturn(new AttemptNormalizationStatusReadList().attemptNormalizationStatuses(List.of(attempt1, attempt2))); Assertions.assertThat(false).isEqualTo(normalizationSummaryCheckActivity.shouldRunNormalization(JOB_ID, 3L, Optional.of(0L))); } diff --git a/docs/reference/api/generated-api-html/index.html b/docs/reference/api/generated-api-html/index.html index 47c77b8a394d..48dd1c63f7be 100644 --- a/docs/reference/api/generated-api-html/index.html +++ b/docs/reference/api/generated-api-html/index.html @@ -280,11 +280,13 @@

Health

Internal

Jobs