Skip to content

Commit

Permalink
use api to do jobpersistence query (#18308)
Browse files Browse the repository at this point in the history
* use api to do jobpersistence query

* renaming some variables

* fix test
  • Loading branch information
xiaohansong authored and nataly committed Nov 3, 2022
1 parent ebc282a commit 72dae30
Show file tree
Hide file tree
Showing 11 changed files with 291 additions and 45 deletions.
41 changes: 41 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2171,6 +2171,26 @@ paths:
$ref: "#/components/responses/NotFoundResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/jobs/get_normalization_status:
post:
tags:
- jobs
- internal
summary: Get normalization status to determine if we can bypass normalization phase
operationId: getAttemptNormalizationStatusesForJob
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/JobIdRequestBody"
responses:
"200":
description: Successful operation
content:
application/json:
schema:
$ref: "#/components/schemas/AttemptNormalizationStatusReadList"

/v1/health:
get:
tags:
Expand Down Expand Up @@ -2244,6 +2264,7 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/InternalOperationResult"

components:
securitySchemes:
bearerAuth:
Expand Down Expand Up @@ -4840,6 +4861,26 @@ components:
properties:
succeeded:
type: boolean
AttemptNormalizationStatusReadList:
type: object
properties:
attemptNormalizationStatuses:
type: array
items:
$ref: "#/components/schemas/AttemptNormalizationStatusRead"
AttemptNormalizationStatusRead:
type: object
properties:
attemptNumber:
$ref: "#/components/schemas/AttemptNumber"
hasRecordsCommitted:
type: boolean
recordsCommitted:
type: integer
format: int64
hasNormalizationFailed:
type: boolean

InvalidInputProperty:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@

import java.util.Optional;

public record AttemptNormalizationStatus(long attemptNumber, Optional<Long> recordsCommitted, boolean normalizationFailed) {}
public record AttemptNormalizationStatus(int attemptNumber, Optional<Long> recordsCommitted, boolean normalizationFailed) {}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.server.apis;

import io.airbyte.analytics.TrackingClient;
import io.airbyte.api.model.generated.AttemptNormalizationStatusReadList;
import io.airbyte.api.model.generated.CheckConnectionRead;
import io.airbyte.api.model.generated.CheckOperationRead;
import io.airbyte.api.model.generated.CompleteDestinationOAuthRequest;
Expand Down Expand Up @@ -789,6 +790,11 @@ public JobDebugInfoRead getJobDebugInfo(final JobIdRequestBody jobIdRequestBody)
return execute(() -> jobHistoryHandler.getJobDebugInfo(jobIdRequestBody));
}

@Override
public AttemptNormalizationStatusReadList getAttemptNormalizationStatusesForJob(final JobIdRequestBody jobIdRequestBody) {
return execute(() -> jobHistoryHandler.getAttemptNormalizationStatuses(jobIdRequestBody));
}

@Override
public File getLogs(final LogsRequestBody logsRequestBody) {
return execute(() -> logsHandler.getLogs(workspaceRoot, workerEnvironment, logConfigs, logsRequestBody));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.airbyte.api.model.generated.AttemptFailureSummary;
import io.airbyte.api.model.generated.AttemptFailureType;
import io.airbyte.api.model.generated.AttemptInfoRead;
import io.airbyte.api.model.generated.AttemptNormalizationStatusRead;
import io.airbyte.api.model.generated.AttemptRead;
import io.airbyte.api.model.generated.AttemptStats;
import io.airbyte.api.model.generated.AttemptStatus;
Expand Down Expand Up @@ -38,6 +39,7 @@
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.persistence.job.models.Attempt;
import io.airbyte.persistence.job.models.AttemptNormalizationStatus;
import io.airbyte.persistence.job.models.Job;
import io.airbyte.server.scheduler.SynchronousJobMetadata;
import io.airbyte.server.scheduler.SynchronousResponse;
Expand Down Expand Up @@ -240,4 +242,13 @@ public SynchronousJobRead getSynchronousJobRead(final SynchronousJobMetadata met
.logs(getLogRead(metadata.getLogPath()));
}

public static AttemptNormalizationStatusRead convertAttemptNormalizationStatus(
AttemptNormalizationStatus databaseStatus) {
return new AttemptNormalizationStatusRead()
.attemptNumber(databaseStatus.attemptNumber())
.hasRecordsCommitted(!databaseStatus.recordsCommitted().isEmpty())
.recordsCommitted(databaseStatus.recordsCommitted().orElse(0L))
.hasNormalizationFailed(databaseStatus.normalizationFailed());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.server.handlers;

import com.google.common.base.Preconditions;
import io.airbyte.api.model.generated.AttemptNormalizationStatusReadList;
import io.airbyte.api.model.generated.ConnectionRead;
import io.airbyte.api.model.generated.DestinationDefinitionIdRequestBody;
import io.airbyte.api.model.generated.DestinationDefinitionRead;
Expand Down Expand Up @@ -146,6 +147,12 @@ public List<JobRead> getLatestSyncJobsForConnections(final List<UUID> connection
.collect(Collectors.toList());
}

public AttemptNormalizationStatusReadList getAttemptNormalizationStatuses(final JobIdRequestBody jobIdRequestBody) throws IOException {
return new AttemptNormalizationStatusReadList()
.attemptNormalizationStatuses(jobPersistence.getAttemptNormalizationStatusesForJob(jobIdRequestBody.getId()).stream()
.map(JobConverter::convertAttemptNormalizationStatus).collect(Collectors.toList()));
}

public List<JobRead> getRunningSyncJobForConnections(final List<UUID> connectionIds) throws IOException {
return jobPersistence.getRunningSyncJobForConnections(connectionIds).stream()
.map(JobConverter::getJobRead)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.persistence.job.models.Attempt;
import io.airbyte.persistence.job.models.AttemptNormalizationStatus;
import io.airbyte.persistence.job.models.AttemptStatus;
import io.airbyte.persistence.job.models.Job;
import io.airbyte.persistence.job.models.JobStatus;
Expand Down Expand Up @@ -376,4 +377,19 @@ void testEnumConversion() {
assertTrue(Enums.isCompatible(JobConfig.ConfigType.class, JobConfigType.class));
}

@Test
@DisplayName("Should return attempt normalization info for the job")
void testGetAttemptNormalizationStatuses() throws IOException {

AttemptNormalizationStatus databaseReadResult = new AttemptNormalizationStatus(1, Optional.of(10L), /* hasNormalizationFailed= */ false);

when(jobPersistence.getAttemptNormalizationStatusesForJob(JOB_ID)).thenReturn(List.of(databaseReadResult));

AttemptNormalizationStatusReadList expectedStatus = new AttemptNormalizationStatusReadList().attemptNormalizationStatuses(
List.of(new AttemptNormalizationStatusRead().attemptNumber(1).hasRecordsCommitted(true).hasNormalizationFailed(false).recordsCommitted(10L)));

assertEquals(expectedStatus, jobHistoryHandler.getAttemptNormalizationStatuses(new JobIdRequestBody().id(JOB_ID)));

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@

import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import java.io.IOException;
import java.util.Optional;

@ActivityInterface
public interface NormalizationSummaryCheckActivity {

@ActivityMethod
boolean shouldRunNormalization(Long jobId, Long attemptId, Optional<Long> numCommittedRecords) throws IOException;
boolean shouldRunNormalization(Long jobId, Long attemptId, Optional<Long> numCommittedRecords);

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;

import datadog.trace.api.Trace;
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.AttemptNormalizationStatusRead;
import io.airbyte.api.client.model.generated.AttemptNormalizationStatusReadList;
import io.airbyte.api.client.model.generated.JobIdRequestBody;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.persistence.job.models.AttemptNormalizationStatus;
import io.temporal.activity.Activity;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -25,54 +27,56 @@
@Singleton
public class NormalizationSummaryCheckActivityImpl implements NormalizationSummaryCheckActivity {

private final Optional<JobPersistence> jobPersistence;
private final AirbyteApiClient airbyteApiClient;

public NormalizationSummaryCheckActivityImpl(final Optional<JobPersistence> jobPersistence) {
this.jobPersistence = jobPersistence;
public NormalizationSummaryCheckActivityImpl(AirbyteApiClient airbyteApiClient) {
this.airbyteApiClient = airbyteApiClient;
}

@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
@Override
@SuppressWarnings("PMD.AvoidLiteralsInIfCondition")
public boolean shouldRunNormalization(final Long jobId, final Long attemptNumber, final Optional<Long> numCommittedRecords) throws IOException {
public boolean shouldRunNormalization(final Long jobId, final Long attemptNumber, final Optional<Long> numCommittedRecords) {
ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, jobId));

// if job persistence is unavailable, default to running normalization
if (jobPersistence.isEmpty()) {
return true;
}

// if the count of committed records for this attempt is > 0 OR if it is null,
// then we should run normalization
if (numCommittedRecords.isEmpty() || numCommittedRecords.get() > 0) {
return true;
}

final List<AttemptNormalizationStatus> attemptNormalizationStatuses = jobPersistence.get().getAttemptNormalizationStatusesForJob(jobId);
final AttemptNormalizationStatusReadList AttemptNormalizationStatusReadList;
try {
AttemptNormalizationStatusReadList = airbyteApiClient.getJobsApi().getAttemptNormalizationStatusesForJob(new JobIdRequestBody().id(jobId));
} catch (ApiException e) {
throw Activity.wrap(e);
}
final AtomicLong totalRecordsCommitted = new AtomicLong(0L);
final AtomicBoolean shouldReturnTrue = new AtomicBoolean(false);

attemptNormalizationStatuses.stream().sorted(Comparator.comparing(AttemptNormalizationStatus::attemptNumber).reversed()).toList()
AttemptNormalizationStatusReadList.getAttemptNormalizationStatuses().stream().sorted(Comparator.comparing(
AttemptNormalizationStatusRead::getAttemptNumber).reversed()).toList()
.forEach(n -> {
if (n.attemptNumber() == attemptNumber) {
// Have to cast it because attemptNumber is read from JobRunConfig.
if (n.getAttemptNumber().intValue() == attemptNumber) {
return;
}

// if normalization succeeded from a previous attempt succeeded,
// we can stop looking for previous attempts
if (!n.normalizationFailed()) {
if (!n.getHasNormalizationFailed()) {
return;
}

// if normalization failed on past attempt, add number of records committed on that attempt to total
// committed number
// if there is no data recorded for the number of committed records, we should assume that there
// were committed records and run normalization
if (n.recordsCommitted().isEmpty()) {
if (!n.getHasRecordsCommitted()) {
shouldReturnTrue.set(true);
return;
} else if (n.recordsCommitted().get() != 0L) {
totalRecordsCommitted.addAndGet(n.recordsCommitted().get());
} else if (n.getRecordsCommitted().longValue() != 0L) {
totalRecordsCommitted.addAndGet(n.getRecordsCommitted());
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.workers.temporal.annotations.TemporalActivityStub;
import io.temporal.workflow.Workflow;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
Expand Down Expand Up @@ -92,7 +91,7 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig,
try {
shouldRun = normalizationSummaryCheckActivity.shouldRunNormalization(Long.valueOf(jobRunConfig.getJobId()), jobRunConfig.getAttemptId(),
Optional.ofNullable(syncOutput.getStandardSyncSummary().getTotalStats().getRecordsCommitted()));
} catch (final IOException e) {
} catch (final Exception e) {
shouldRun = true;
}
if (!shouldRun) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,22 @@
package io.airbyte.workers.temporal.scheduling.activities;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.persistence.job.models.AttemptNormalizationStatus;
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.generated.JobsApi;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.AttemptNormalizationStatusRead;
import io.airbyte.api.client.model.generated.AttemptNormalizationStatusReadList;
import io.airbyte.api.client.model.generated.JobIdRequestBody;
import io.airbyte.workers.temporal.sync.NormalizationSummaryCheckActivityImpl;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

@Slf4j
Expand All @@ -26,47 +29,62 @@ class NormalizationSummaryCheckActivityTest {

private static final Long JOB_ID = 10L;
static private NormalizationSummaryCheckActivityImpl normalizationSummaryCheckActivity;
static private JobPersistence mJobPersistence;
static private AirbyteApiClient airbyteApiClient;
static private JobsApi jobsApi;

@BeforeAll
static void setUp() {
mJobPersistence = mock(JobPersistence.class);
normalizationSummaryCheckActivity = new NormalizationSummaryCheckActivityImpl(Optional.of(mJobPersistence));
airbyteApiClient = mock(AirbyteApiClient.class);
jobsApi = mock(JobsApi.class);
when(airbyteApiClient.getJobsApi()).thenReturn(jobsApi);
normalizationSummaryCheckActivity = new NormalizationSummaryCheckActivityImpl(airbyteApiClient);
}

@Test
void testShouldRunNormalizationRecordsCommittedOnFirstAttemptButNotCurrentAttempt() throws IOException {
void testShouldRunNormalizationRecordsCommittedOnFirstAttemptButNotCurrentAttempt() throws ApiException {
// Attempt 1 committed records, but normalization failed
// Attempt 2 did not commit records, normalization failed (or did not run)
final AttemptNormalizationStatus attempt1 = new AttemptNormalizationStatus(1, Optional.of(10L), true);
final AttemptNormalizationStatus attempt2 = new AttemptNormalizationStatus(2, Optional.of(0L), true);
Mockito.when(mJobPersistence.getAttemptNormalizationStatusesForJob(JOB_ID)).thenReturn(List.of(attempt1, attempt2));
final AttemptNormalizationStatusRead attempt1 =
new AttemptNormalizationStatusRead().attemptNumber(1).hasRecordsCommitted(true).recordsCommitted(10L).hasNormalizationFailed(true);
final AttemptNormalizationStatusRead attempt2 =
new AttemptNormalizationStatusRead().attemptNumber(2).hasRecordsCommitted(true).recordsCommitted(0L).hasNormalizationFailed(true);

when(jobsApi.getAttemptNormalizationStatusesForJob(new JobIdRequestBody().id(JOB_ID)))
.thenReturn(new AttemptNormalizationStatusReadList().attemptNormalizationStatuses(List.of(attempt1, attempt2)));

Assertions.assertThat(true).isEqualTo(normalizationSummaryCheckActivity.shouldRunNormalization(JOB_ID, 3L, Optional.of(0L)));
}

@Test
void testShouldRunNormalizationRecordsCommittedOnCurrentAttempt() throws IOException {
void testShouldRunNormalizationRecordsCommittedOnCurrentAttempt() throws ApiException {
Assertions.assertThat(true).isEqualTo(normalizationSummaryCheckActivity.shouldRunNormalization(JOB_ID, 3L, Optional.of(30L)));
}

@Test
void testShouldRunNormalizationNoRecordsCommittedOnCurrentAttemptOrPreviousAttempts() throws IOException {
void testShouldRunNormalizationNoRecordsCommittedOnCurrentAttemptOrPreviousAttempts() throws ApiException {
// No attempts committed any records
// Normalization did not run on any attempts
final AttemptNormalizationStatus attempt1 = new AttemptNormalizationStatus(1, Optional.of(0L), true);
final AttemptNormalizationStatus attempt2 = new AttemptNormalizationStatus(2, Optional.of(0L), true);
Mockito.when(mJobPersistence.getAttemptNormalizationStatusesForJob(JOB_ID)).thenReturn(List.of(attempt1, attempt2));
final AttemptNormalizationStatusRead attempt1 =
new AttemptNormalizationStatusRead().attemptNumber(1).hasRecordsCommitted(true).recordsCommitted(0L).hasNormalizationFailed(true);
final AttemptNormalizationStatusRead attempt2 =
new AttemptNormalizationStatusRead().attemptNumber(2).hasRecordsCommitted(true).recordsCommitted(0L).hasNormalizationFailed(true);

when(jobsApi.getAttemptNormalizationStatusesForJob(new JobIdRequestBody().id(JOB_ID)))
.thenReturn(new AttemptNormalizationStatusReadList().attemptNormalizationStatuses(List.of(attempt1, attempt2)));
Assertions.assertThat(false).isEqualTo(normalizationSummaryCheckActivity.shouldRunNormalization(JOB_ID, 3L, Optional.of(0L)));
}

@Test
void testShouldRunNormalizationNoRecordsCommittedOnCurrentAttemptPreviousAttemptsSucceeded() throws IOException {
void testShouldRunNormalizationNoRecordsCommittedOnCurrentAttemptPreviousAttemptsSucceeded() throws ApiException {
// Records committed on first two attempts and normalization succeeded
// No records committed on current attempt and normalization has not yet run
final AttemptNormalizationStatus attempt1 = new AttemptNormalizationStatus(1, Optional.of(10L), false);
final AttemptNormalizationStatus attempt2 = new AttemptNormalizationStatus(2, Optional.of(20L), false);
Mockito.when(mJobPersistence.getAttemptNormalizationStatusesForJob(JOB_ID)).thenReturn(List.of(attempt1, attempt2));
final AttemptNormalizationStatusRead attempt1 =
new AttemptNormalizationStatusRead().attemptNumber(1).hasRecordsCommitted(true).recordsCommitted(10L).hasNormalizationFailed(false);
final AttemptNormalizationStatusRead attempt2 =
new AttemptNormalizationStatusRead().attemptNumber(2).hasRecordsCommitted(true).recordsCommitted(20L).hasNormalizationFailed(false);

when(jobsApi.getAttemptNormalizationStatusesForJob(new JobIdRequestBody().id(JOB_ID)))
.thenReturn(new AttemptNormalizationStatusReadList().attemptNormalizationStatuses(List.of(attempt1, attempt2)));
Assertions.assertThat(false).isEqualTo(normalizationSummaryCheckActivity.shouldRunNormalization(JOB_ID, 3L, Optional.of(0L)));
}

Expand Down
Loading

0 comments on commit 72dae30

Please sign in to comment.