Skip to content

Commit

Permalink
Use ConnectionApi to fetch source ID (#20670)
Browse files Browse the repository at this point in the history
  • Loading branch information
alovew committed Dec 21, 2022
1 parent 16e890a commit eee6193
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.auth0.jwt.algorithms.Algorithm;
import com.google.auth.oauth2.ServiceAccountCredentials;
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.generated.ConnectionApi;
import io.airbyte.api.client.generated.SourceApi;
import io.airbyte.api.client.invoker.generated.ApiClient;
import io.airbyte.commons.temporal.config.WorkerMode;
Expand Down Expand Up @@ -67,6 +68,11 @@ public SourceApi sourceApi(final ApiClient apiClient) {
return new SourceApi(apiClient);
}

@Singleton
public ConnectionApi connectionApi(final ApiClient apiClient) {
return new ConnectionApi(apiClient);
}

@Singleton
public HttpClient httpClient() {
return HttpClient.newHttpClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import com.google.common.annotations.VisibleForTesting;
import datadog.trace.api.Trace;
import io.airbyte.commons.temporal.config.WorkerMode;
import io.airbyte.api.client.generated.ConnectionApi;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.ConnectionRead;
import io.airbyte.commons.temporal.exception.RetryableException;
import io.airbyte.config.Cron;
import io.airbyte.config.StandardSync;
Expand Down Expand Up @@ -65,25 +68,29 @@ public class ConfigFetchActivityImpl implements ConfigFetchActivity {
private final WorkspaceHelper workspaceHelper;
private final Integer syncJobMaxAttempts;
private final Supplier<Long> currentSecondsSupplier;
private final ConnectionApi connectionApi;

public ConfigFetchActivityImpl(final ConfigRepository configRepository,
final JobPersistence jobPersistence,
@Value("${airbyte.worker.sync.max-attempts}") final Integer syncJobMaxAttempts,
@Named("currentSecondsSupplier") final Supplier<Long> currentSecondsSupplier) {
this(configRepository, jobPersistence, new WorkspaceHelper(configRepository, jobPersistence), syncJobMaxAttempts, currentSecondsSupplier);
@Named("currentSecondsSupplier") final Supplier<Long> currentSecondsSupplier,
final ConnectionApi connectionApi) {
this(configRepository, jobPersistence, new WorkspaceHelper(configRepository, jobPersistence), syncJobMaxAttempts, currentSecondsSupplier, connectionApi);
}

@VisibleForTesting
protected ConfigFetchActivityImpl(final ConfigRepository configRepository,
final JobPersistence jobPersistence,
final WorkspaceHelper workspaceHelper,
@Value("${airbyte.worker.sync.max-attempts}") final Integer syncJobMaxAttempts,
@Named("currentSecondsSupplier") final Supplier<Long> currentSecondsSupplier) {
@Named("currentSecondsSupplier") final Supplier<Long> currentSecondsSupplier,
final ConnectionApi connectionApi) {
this.configRepository = configRepository;
this.jobPersistence = jobPersistence;
this.workspaceHelper = workspaceHelper;
this.syncJobMaxAttempts = syncJobMaxAttempts;
this.currentSecondsSupplier = currentSecondsSupplier;
this.connectionApi = connectionApi;
}

@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
Expand Down Expand Up @@ -229,9 +236,11 @@ public GetMaxAttemptOutput getMaxAttempt() {
@Override
public Optional<UUID> getSourceId(final UUID connectionId) {
try {
final StandardSync standardSync = getStandardSync(connectionId);
return Optional.ofNullable(standardSync.getSourceId());
} catch (final JsonValidationException | ConfigNotFoundException | IOException e) {
final io.airbyte.api.client.model.generated.ConnectionIdRequestBody requestBody =
new io.airbyte.api.client.model.generated.ConnectionIdRequestBody().connectionId(connectionId);
final ConnectionRead connectionRead = connectionApi.getConnection(requestBody);
return Optional.ofNullable(connectionRead.getSourceId());
} catch (ApiException e) {
log.info("Encountered an error fetching the connection's Source ID: ", e);
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

import io.airbyte.api.client.generated.ConnectionApi;
import io.airbyte.config.BasicSchedule;
import io.airbyte.config.Cron;
import io.airbyte.config.Schedule;
Expand Down Expand Up @@ -55,6 +55,9 @@ class ConfigFetchActivityTest {
@Mock
private Job mJob;

@Mock
private ConnectionApi mConnectionApi;

private ConfigFetchActivityImpl configFetchActivity;

private final static UUID connectionId = UUID.randomUUID();
Expand Down Expand Up @@ -102,7 +105,7 @@ class ConfigFetchActivityTest {
void setup() {
configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, mWorkspaceHelper, SYNC_JOB_MAX_ATTEMPTS,
() -> Instant.now().getEpochSecond());
() -> Instant.now().getEpochSecond(), mConnectionApi);
}

@Nested
Expand Down Expand Up @@ -170,7 +173,8 @@ void testDeleted() throws IOException, JsonValidationException, ConfigNotFoundEx
@Test
@DisplayName("Test we will wait the required amount of time with legacy config")
void testWait() throws IOException, JsonValidationException, ConfigNotFoundException {
configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3);
configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3, mConnectionApi);

when(mJob.getStartedAtInSecond())
.thenReturn(Optional.of(60L));
Expand All @@ -192,7 +196,8 @@ void testWait() throws IOException, JsonValidationException, ConfigNotFoundExcep
@Test
@DisplayName("Test we will not wait if we are late in the legacy schedule schema")
void testNotWaitIfLate() throws IOException, JsonValidationException, ConfigNotFoundException {
configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 10);
configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 10, mConnectionApi);

when(mJob.getStartedAtInSecond())
.thenReturn(Optional.of(60L));
Expand Down Expand Up @@ -247,7 +252,7 @@ void testBasicScheduleTypeFirstRun() throws IOException, JsonValidationException
@Test
@DisplayName("Test that we will wait the required amount of time with a BASIC_SCHEDULE type on a subsequent run")
void testBasicScheduleSubsequentRun() throws IOException, JsonValidationException, ConfigNotFoundException {
configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3);
configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3, mConnectionApi);

when(mJob.getStartedAtInSecond())
.thenReturn(Optional.of(60L));
Expand Down Expand Up @@ -279,7 +284,7 @@ void testCronScheduleSubsequentRun() throws IOException, JsonValidationException

configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, mWorkspaceHelper, SYNC_JOB_MAX_ATTEMPTS,
() -> mockRightNow.getTimeInMillis() / 1000L);
() -> mockRightNow.getTimeInMillis() / 1000L, mConnectionApi);

when(mJobPersistence.getLastReplicationJob(connectionId))
.thenReturn(Optional.of(mJob));
Expand Down Expand Up @@ -308,7 +313,7 @@ void testCronScheduleMinimumInterval() throws IOException, JsonValidationExcepti

configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, mWorkspaceHelper, SYNC_JOB_MAX_ATTEMPTS,
() -> mockRightNow.getTimeInMillis() / 1000L);
() -> mockRightNow.getTimeInMillis() / 1000L, mConnectionApi);

when(mJob.getStartedAtInSecond()).thenReturn(Optional.of(mockRightNow.getTimeInMillis() / 1000L));
when(mJobPersistence.getLastReplicationJob(connectionId))
Expand Down Expand Up @@ -338,7 +343,7 @@ void testCronSchedulingNoise() throws IOException, JsonValidationException, Conf

configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, mWorkspaceHelper, SYNC_JOB_MAX_ATTEMPTS,
() -> mockRightNow.getTimeInMillis() / 1000L);
() -> mockRightNow.getTimeInMillis() / 1000L, mConnectionApi);

when(mJob.getStartedAtInSecond()).thenReturn(Optional.of(mockRightNow.getTimeInMillis() / 1000L));
when(mJobPersistence.getLastReplicationJob(connectionId))
Expand All @@ -362,7 +367,8 @@ class TestGetMaxAttempt {
@DisplayName("Test that we are using to right service to get the maximum amount of attempt")
void testGetMaxAttempt() {
final int maxAttempt = 15031990;
configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, maxAttempt, () -> Instant.now().getEpochSecond());
configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, maxAttempt, () -> Instant.now().getEpochSecond(), mConnectionApi);
Assertions.assertThat(configFetchActivity.getMaxAttempt().getMaxAttempt())
.isEqualTo(maxAttempt);
}
Expand Down

0 comments on commit eee6193

Please sign in to comment.