Skip to content

Commit

Permalink
Use ConnectionApi to fetch source ID (#20670)
Browse files Browse the repository at this point in the history
  • Loading branch information
alovew committed Dec 19, 2022
1 parent c0838f8 commit ab36efb
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.auth0.jwt.algorithms.Algorithm;
import com.google.auth.oauth2.ServiceAccountCredentials;
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.generated.ConnectionApi;
import io.airbyte.api.client.generated.SourceApi;
import io.airbyte.api.client.invoker.generated.ApiClient;
import io.airbyte.commons.temporal.config.WorkerMode;
Expand Down Expand Up @@ -67,6 +68,11 @@ public SourceApi sourceApi(final ApiClient apiClient) {
return new SourceApi(apiClient);
}

@Singleton
public ConnectionApi connectionApi(final ApiClient apiClient) {
return new ConnectionApi(apiClient);
}

@Singleton
public HttpClient httpClient() {
return HttpClient.newHttpClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

import datadog.trace.api.Trace;
import io.airbyte.commons.temporal.config.WorkerMode;
import io.airbyte.api.client.generated.ConnectionApi;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.ConnectionRead;
import io.airbyte.commons.temporal.exception.RetryableException;
import io.airbyte.config.Cron;
import io.airbyte.config.StandardSync;
Expand Down Expand Up @@ -51,15 +54,18 @@ public class ConfigFetchActivityImpl implements ConfigFetchActivity {
private final JobPersistence jobPersistence;
private final Integer syncJobMaxAttempts;
private final Supplier<Long> currentSecondsSupplier;
private final ConnectionApi connectionApi;

public ConfigFetchActivityImpl(final ConfigRepository configRepository,
final JobPersistence jobPersistence,
@Value("${airbyte.worker.sync.max-attempts}") final Integer syncJobMaxAttempts,
@Named("currentSecondsSupplier") final Supplier<Long> currentSecondsSupplier) {
@Named("currentSecondsSupplier") final Supplier<Long> currentSecondsSupplier,
final ConnectionApi connectionApi) {
this.configRepository = configRepository;
this.jobPersistence = jobPersistence;
this.syncJobMaxAttempts = syncJobMaxAttempts;
this.currentSecondsSupplier = currentSecondsSupplier;
this.connectionApi = connectionApi;
}

@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
Expand Down Expand Up @@ -179,9 +185,11 @@ public GetMaxAttemptOutput getMaxAttempt() {
@Override
public Optional<UUID> getSourceId(final UUID connectionId) {
try {
final StandardSync standardSync = getStandardSync(connectionId);
return Optional.ofNullable(standardSync.getSourceId());
} catch (final JsonValidationException | ConfigNotFoundException | IOException e) {
final io.airbyte.api.client.model.generated.ConnectionIdRequestBody requestBody =
new io.airbyte.api.client.model.generated.ConnectionIdRequestBody().connectionId(connectionId);
final ConnectionRead connectionRead = connectionApi.getConnection(requestBody);
return Optional.ofNullable(connectionRead.getSourceId());
} catch (ApiException e) {
log.info("Encountered an error fetching the connection's Source ID: ", e);
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.workers.temporal.scheduling.activities;

import io.airbyte.api.client.generated.ConnectionApi;
import io.airbyte.config.BasicSchedule;
import io.airbyte.config.Cron;
import io.airbyte.config.Schedule;
Expand Down Expand Up @@ -48,6 +49,9 @@ class ConfigFetchActivityTest {
@Mock
private Job mJob;

@Mock
private ConnectionApi mConnectionApi;

private ConfigFetchActivityImpl configFetchActivity;

private final static UUID connectionId = UUID.randomUUID();
Expand Down Expand Up @@ -93,7 +97,8 @@ class ConfigFetchActivityTest {
@BeforeEach
void setup() {
configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> Instant.now().getEpochSecond());
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> Instant.now().getEpochSecond(),
mConnectionApi);
}

@Nested
Expand Down Expand Up @@ -161,7 +166,8 @@ void testDeleted() throws IOException, JsonValidationException, ConfigNotFoundEx
@Test
@DisplayName("Test we will wait the required amount of time with legacy config")
void testWait() throws IOException, JsonValidationException, ConfigNotFoundException {
configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3);
configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3, mConnectionApi);

Mockito.when(mJob.getStartedAtInSecond())
.thenReturn(Optional.of(60L));
Expand All @@ -183,7 +189,8 @@ void testWait() throws IOException, JsonValidationException, ConfigNotFoundExcep
@Test
@DisplayName("Test we will not wait if we are late in the legacy schedule schema")
void testNotWaitIfLate() throws IOException, JsonValidationException, ConfigNotFoundException {
configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 10);
configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 10, mConnectionApi);

Mockito.when(mJob.getStartedAtInSecond())
.thenReturn(Optional.of(60L));
Expand Down Expand Up @@ -238,7 +245,7 @@ void testBasicScheduleTypeFirstRun() throws IOException, JsonValidationException
@Test
@DisplayName("Test that we will wait the required amount of time with a BASIC_SCHEDULE type on a subsequent run")
void testBasicScheduleSubsequentRun() throws IOException, JsonValidationException, ConfigNotFoundException {
configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3);
configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3, mConnectionApi);

Mockito.when(mJob.getStartedAtInSecond())
.thenReturn(Optional.of(60L));
Expand Down Expand Up @@ -267,7 +274,8 @@ void testCronScheduleSubsequentRun() throws IOException, JsonValidationException
mockRightNow.set(Calendar.MILLISECOND, 0);

configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> mockRightNow.getTimeInMillis() / 1000L);
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> mockRightNow.getTimeInMillis() / 1000L,
mConnectionApi);

Mockito.when(mJobPersistence.getLastReplicationJob(connectionId))
.thenReturn(Optional.of(mJob));
Expand All @@ -293,7 +301,8 @@ void testCronScheduleMinimumInterval() throws IOException, JsonValidationExcepti
mockRightNow.set(Calendar.MILLISECOND, 0);

configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> mockRightNow.getTimeInMillis() / 1000L);
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> mockRightNow.getTimeInMillis() / 1000L,
mConnectionApi);

Mockito.when(mJob.getStartedAtInSecond()).thenReturn(Optional.of(mockRightNow.getTimeInMillis() / 1000L));
Mockito.when(mJobPersistence.getLastReplicationJob(connectionId))
Expand All @@ -317,7 +326,8 @@ class TestGetMaxAttempt {
@DisplayName("Test that we are using to right service to get the maximum amount of attempt")
void testGetMaxAttempt() {
final int maxAttempt = 15031990;
configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, maxAttempt, () -> Instant.now().getEpochSecond());
configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, maxAttempt, () -> Instant.now().getEpochSecond(), mConnectionApi);
Assertions.assertThat(configFetchActivity.getMaxAttempt().getMaxAttempt())
.isEqualTo(maxAttempt);
}
Expand Down

0 comments on commit ab36efb

Please sign in to comment.