From c6715bd98a457d7f1adf879fbbb6fe570f8a5a19 Mon Sep 17 00:00:00 2001 From: Anne <102554163+alovew@users.noreply.github.com> Date: Wed, 4 Jan 2023 13:24:26 -0800 Subject: [PATCH] Remove config repo dependency for getStatus (#21033) --- .../scheduling/activities/ConfigFetchActivity.java | 4 ++-- .../activities/ConfigFetchActivityImpl.java | 11 +++++++---- .../workers/temporal/sync/SyncWorkflowTest.java | 6 +++--- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivity.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivity.java index f936f9d93788..214cb323cd2e 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivity.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivity.java @@ -4,8 +4,8 @@ package io.airbyte.workers.temporal.scheduling.activities; +import io.airbyte.api.client.model.generated.ConnectionStatus; import io.airbyte.config.StandardSync; -import io.airbyte.config.StandardSync.Status; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.validation.json.JsonValidationException; import io.temporal.activity.ActivityInterface; @@ -25,7 +25,7 @@ public interface ConfigFetchActivity { Optional getSourceId(UUID connectionId); @ActivityMethod - Optional getStatus(UUID connectionId); + Optional getStatus(UUID connectionId); @Data @NoArgsConstructor diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java index 8eb3e53a82b5..27df5e708f25 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java @@ -12,6 +12,7 @@ import io.airbyte.api.client.generated.ConnectionApi; import io.airbyte.api.client.invoker.generated.ApiException; import io.airbyte.api.client.model.generated.ConnectionRead; +import io.airbyte.api.client.model.generated.ConnectionStatus; import io.airbyte.commons.temporal.config.WorkerMode; import io.airbyte.commons.temporal.exception.RetryableException; import io.airbyte.config.Cron; @@ -248,11 +249,13 @@ public Optional getSourceId(final UUID connectionId) { } @Override - public Optional getStatus(final UUID connectionId) { + public Optional getStatus(final UUID connectionId) { try { - final StandardSync standardSync = getStandardSync(connectionId); - return Optional.ofNullable(standardSync.getStatus()); - } catch (final JsonValidationException | ConfigNotFoundException | IOException e) { + final io.airbyte.api.client.model.generated.ConnectionIdRequestBody requestBody = + new io.airbyte.api.client.model.generated.ConnectionIdRequestBody().connectionId(connectionId); + final ConnectionRead connectionRead = connectionApi.getConnection(requestBody); + return Optional.ofNullable(connectionRead.getStatus()); + } catch (ApiException e) { log.info("Encountered an error fetching the connection's status: ", e); return Optional.empty(); } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java index 15c4a7b87a9c..a6af4e8abafa 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java @@ -16,6 +16,7 @@ import static org.mockito.Mockito.when; import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.api.client.model.generated.ConnectionStatus; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.temporal.TemporalUtils; import io.airbyte.commons.temporal.scheduling.SyncWorkflow; @@ -26,7 +27,6 @@ import io.airbyte.config.OperatorWebhookInput; import io.airbyte.config.ResourceRequirements; import io.airbyte.config.StandardSync; -import io.airbyte.config.StandardSync.Status; import io.airbyte.config.StandardSyncInput; import io.airbyte.config.StandardSyncOperation; import io.airbyte.config.StandardSyncOperation.OperatorType; @@ -159,7 +159,7 @@ void setUp() { when(configFetchActivity.getSourceId(sync.getConnectionId())).thenReturn(Optional.of(SOURCE_ID)); when(refreshSchemaActivity.shouldRefreshSchema(SOURCE_ID)).thenReturn(true); - when(configFetchActivity.getStatus(sync.getConnectionId())).thenReturn(Optional.of(Status.ACTIVE)); + when(configFetchActivity.getStatus(sync.getConnectionId())).thenReturn(Optional.of(ConnectionStatus.ACTIVE)); longActivityOptions = ActivityOptions.newBuilder() .setScheduleToCloseTimeout(Duration.ofDays(3)) @@ -405,7 +405,7 @@ void testWebhookOperation() { @Test @Disabled("Temporarily disabled to address OC issue #1210") void testSkipReplicationAfterRefreshSchema() { - when(configFetchActivity.getStatus(any())).thenReturn(Optional.of(Status.INACTIVE)); + when(configFetchActivity.getStatus(any())).thenReturn(Optional.of(ConnectionStatus.INACTIVE)); final StandardSyncOutput output = execute(); verifyShouldRefreshSchema(refreshSchemaActivity); verifyRefreshSchema(refreshSchemaActivity, sync);