From 8e8dd9938b880a4a07b0c20b8ebf083fe430eb1a Mon Sep 17 00:00:00 2001 From: jdpgrailsdev Date: Fri, 16 Dec 2022 18:47:01 -0500 Subject: [PATCH 1/6] Disable auto detect schema activity bits --- .../workers/config/ActivityBeanFactory.java | 8 ++-- .../activities/ConfigFetchActivityImpl.java | 3 ++ .../temporal/sync/SyncWorkflowImpl.java | 39 +++++++++---------- 3 files changed, 25 insertions(+), 25 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java index 7c333e8a0b75e..c9cd4a008b397 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java @@ -58,8 +58,8 @@ public List checkConnectionActivities( @Requires(env = WorkerMode.CONTROL_PLANE) @Named("notifyActivities") public List notifyActivities(final NotifySchemaChangeActivity notifySchemaChangeActivity, - SlackConfigActivity slackConfigActivity, - ConfigFetchActivity configFetchActivity) { + final SlackConfigActivity slackConfigActivity, + final ConfigFetchActivity configFetchActivity) { return List.of(notifySchemaChangeActivity, slackConfigActivity, configFetchActivity); } @@ -112,10 +112,10 @@ public List syncActivities( final PersistStateActivity persistStateActivity, final NormalizationSummaryCheckActivity normalizationSummaryCheckActivity, final WebhookOperationActivity webhookOperationActivity, - final ConfigFetchActivity configFetchActivity, + /* final ConfigFetchActivity configFetchActivity, */ final RefreshSchemaActivity refreshSchemaActivity) { return List.of(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity, normalizationSummaryCheckActivity, - webhookOperationActivity, configFetchActivity, refreshSchemaActivity); + webhookOperationActivity, /* configFetchActivity, */ refreshSchemaActivity); } @Singleton diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java index 6e48ccfef5a9a..514a56eba4f95 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java @@ -8,6 +8,7 @@ import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY; import datadog.trace.api.Trace; +import io.airbyte.commons.temporal.config.WorkerMode; import io.airbyte.commons.temporal.exception.RetryableException; import io.airbyte.config.Cron; import io.airbyte.config.StandardSync; @@ -20,6 +21,7 @@ import io.airbyte.persistence.job.JobPersistence; import io.airbyte.persistence.job.models.Job; import io.airbyte.validation.json.JsonValidationException; +import io.micronaut.context.annotation.Requires; import io.micronaut.context.annotation.Value; import jakarta.inject.Named; import jakarta.inject.Singleton; @@ -39,6 +41,7 @@ @Slf4j @Singleton +@Requires(env = WorkerMode.CONTROL_PLANE) public class ConfigFetchActivityImpl implements ConfigFetchActivity { private final static long MS_PER_SECOND = 1000L; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java index e0c4b0f55af8e..772c994098160 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java @@ -17,14 +17,10 @@ import io.airbyte.config.NormalizationSummary; import io.airbyte.config.OperatorDbtInput; import io.airbyte.config.OperatorWebhookInput; -import io.airbyte.config.StandardSync.Status; import io.airbyte.config.StandardSyncInput; import io.airbyte.config.StandardSyncOperation; import io.airbyte.config.StandardSyncOperation.OperatorType; import io.airbyte.config.StandardSyncOutput; -import io.airbyte.config.StandardSyncSummary; -import io.airbyte.config.StandardSyncSummary.ReplicationStatus; -import io.airbyte.config.SyncStats; import io.airbyte.config.WebhookOperationSummary; import io.airbyte.metrics.lib.ApmTraceUtils; import io.airbyte.persistence.job.models.IntegrationLauncherConfig; @@ -87,23 +83,24 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig, final int autoDetectSchemaVersion = Workflow.getVersion(AUTO_DETECT_SCHEMA_TAG, Workflow.DEFAULT_VERSION, AUTO_DETECT_SCHEMA_VERSION); - if (autoDetectSchemaVersion >= AUTO_DETECT_SCHEMA_VERSION) { - final Optional sourceId = configFetchActivity.getSourceId(connectionId); - - if (!sourceId.isEmpty() && refreshSchemaActivity.shouldRefreshSchema(sourceId.get())) { - LOGGER.info("Refreshing source schema..."); - refreshSchemaActivity.refreshSchema(sourceId.get(), connectionId); - } - - final Optional status = configFetchActivity.getStatus(connectionId); - if (!status.isEmpty() && Status.INACTIVE == status.get()) { - LOGGER.info("Connection is disabled. Cancelling run."); - final StandardSyncOutput output = - new StandardSyncOutput() - .withStandardSyncSummary(new StandardSyncSummary().withStatus(ReplicationStatus.CANCELLED).withTotalStats(new SyncStats())); - return output; - } - } + // if (autoDetectSchemaVersion >= AUTO_DETECT_SCHEMA_VERSION) { + // final Optional sourceId = configFetchActivity.getSourceId(connectionId); + // + // if (!sourceId.isEmpty() && refreshSchemaActivity.shouldRefreshSchema(sourceId.get())) { + // LOGGER.info("Refreshing source schema..."); + // refreshSchemaActivity.refreshSchema(sourceId.get(), connectionId); + // } + // + // final Optional status = configFetchActivity.getStatus(connectionId); + // if (!status.isEmpty() && Status.INACTIVE == status.get()) { + // LOGGER.info("Connection is disabled. Cancelling run."); + // final StandardSyncOutput output = + // new StandardSyncOutput() + // .withStandardSyncSummary(new + // StandardSyncSummary().withStatus(ReplicationStatus.CANCELLED).withTotalStats(new SyncStats())); + // return output; + // } + // } StandardSyncOutput syncOutput = replicationActivity.replicate(jobRunConfig, sourceLauncherConfig, destinationLauncherConfig, syncInput, taskQueue); From b13df68905ef73107df6fa0f69cec6b53328c90d Mon Sep 17 00:00:00 2001 From: jdpgrailsdev Date: Fri, 16 Dec 2022 18:51:13 -0500 Subject: [PATCH 2/6] Disable impacted tests --- .../io/airbyte/workers/temporal/sync/SyncWorkflowTest.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java index 6ffac60aeef3f..a55ee59329367 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java @@ -237,6 +237,7 @@ void testSuccess() { } @Test + @Disabled void testReplicationFailure() { doThrow(new IllegalArgumentException("induced exception")).when(replicationActivity).replicate( JOB_RUN_CONFIG, @@ -255,6 +256,7 @@ void testReplicationFailure() { } @Test + @Disabled void testReplicationFailedGracefully() { doReturn(replicationFailOutput).when(replicationActivity).replicate( JOB_RUN_CONFIG, @@ -282,6 +284,7 @@ void testReplicationFailedGracefully() { } @Test + @Disabled void testNormalizationFailure() { doReturn(replicationSuccessOutput).when(replicationActivity).replicate( JOB_RUN_CONFIG, @@ -305,6 +308,7 @@ void testNormalizationFailure() { } @Test + @Disabled void testCancelDuringReplication() { doAnswer(ignored -> { cancelWorkflow(); @@ -399,7 +403,7 @@ void testWebhookOperation() { @Test void testSkipReplicationAfterRefreshSchema() { when(configFetchActivity.getStatus(any())).thenReturn(Optional.of(Status.INACTIVE)); - StandardSyncOutput output = execute(); + final StandardSyncOutput output = execute(); verifyShouldRefreshSchema(refreshSchemaActivity); verifyRefreshSchema(refreshSchemaActivity, sync); verifyNoInteractions(replicationActivity); From 4dcdafc09982e8b43e95f2ee1d121faa234a5286 Mon Sep 17 00:00:00 2001 From: jdpgrailsdev Date: Fri, 16 Dec 2022 19:09:09 -0500 Subject: [PATCH 3/6] Disable auto detect schema checks --- .../temporal/sync/SyncWorkflowTest.java | 29 +++++++++---------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java index a55ee59329367..ac94875749022 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java @@ -229,15 +229,14 @@ void testSuccess() { verifyNormalize(normalizationActivity, normalizationInput); verifyDbtTransform(dbtTransformationActivity, syncInput.getResourceRequirements(), operatorDbtInput); - verifyShouldRefreshSchema(refreshSchemaActivity); - verifyRefreshSchema(refreshSchemaActivity, sync); +// verifyShouldRefreshSchema(refreshSchemaActivity); +// verifyRefreshSchema(refreshSchemaActivity, sync); assertEquals( replicationSuccessOutput.withNormalizationSummary(normalizationSummary).getStandardSyncSummary(), actualOutput.getStandardSyncSummary()); } @Test - @Disabled void testReplicationFailure() { doThrow(new IllegalArgumentException("induced exception")).when(replicationActivity).replicate( JOB_RUN_CONFIG, @@ -247,8 +246,8 @@ void testReplicationFailure() { assertThrows(WorkflowFailedException.class, this::execute); - verifyShouldRefreshSchema(refreshSchemaActivity); - verifyRefreshSchema(refreshSchemaActivity, sync); +// verifyShouldRefreshSchema(refreshSchemaActivity); +// verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyNoInteractions(persistStateActivity); verifyNoInteractions(normalizationActivity); @@ -256,7 +255,6 @@ void testReplicationFailure() { } @Test - @Disabled void testReplicationFailedGracefully() { doReturn(replicationFailOutput).when(replicationActivity).replicate( JOB_RUN_CONFIG, @@ -271,8 +269,8 @@ void testReplicationFailedGracefully() { final StandardSyncOutput actualOutput = execute(); - verifyShouldRefreshSchema(refreshSchemaActivity); - verifyRefreshSchema(refreshSchemaActivity, sync); +// verifyShouldRefreshSchema(refreshSchemaActivity); +// verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyPersistState(persistStateActivity, sync, replicationFailOutput, syncInput.getCatalog()); verifyNormalize(normalizationActivity, normalizationInput); @@ -284,7 +282,6 @@ void testReplicationFailedGracefully() { } @Test - @Disabled void testNormalizationFailure() { doReturn(replicationSuccessOutput).when(replicationActivity).replicate( JOB_RUN_CONFIG, @@ -299,8 +296,8 @@ void testNormalizationFailure() { assertThrows(WorkflowFailedException.class, this::execute); - verifyShouldRefreshSchema(refreshSchemaActivity); - verifyRefreshSchema(refreshSchemaActivity, sync); +// verifyShouldRefreshSchema(refreshSchemaActivity); +// verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyPersistState(persistStateActivity, sync, replicationSuccessOutput, syncInput.getCatalog()); verifyNormalize(normalizationActivity, normalizationInput); @@ -308,7 +305,6 @@ void testNormalizationFailure() { } @Test - @Disabled void testCancelDuringReplication() { doAnswer(ignored -> { cancelWorkflow(); @@ -321,8 +317,8 @@ void testCancelDuringReplication() { assertThrows(WorkflowFailedException.class, this::execute); - verifyShouldRefreshSchema(refreshSchemaActivity); - verifyRefreshSchema(refreshSchemaActivity, sync); +// verifyShouldRefreshSchema(refreshSchemaActivity); +// verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyNoInteractions(persistStateActivity); verifyNoInteractions(normalizationActivity); @@ -347,8 +343,8 @@ void testCancelDuringNormalization() { assertThrows(WorkflowFailedException.class, this::execute); - verifyShouldRefreshSchema(refreshSchemaActivity); - verifyRefreshSchema(refreshSchemaActivity, sync); +// verifyShouldRefreshSchema(refreshSchemaActivity); +// verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyPersistState(persistStateActivity, sync, replicationSuccessOutput, syncInput.getCatalog()); verifyNormalize(normalizationActivity, normalizationInput); @@ -401,6 +397,7 @@ void testWebhookOperation() { } @Test + @Disabled void testSkipReplicationAfterRefreshSchema() { when(configFetchActivity.getStatus(any())).thenReturn(Optional.of(Status.INACTIVE)); final StandardSyncOutput output = execute(); From ae559b881aa8c6f3c362ebbd5a1ce383d6d6143b Mon Sep 17 00:00:00 2001 From: jdpgrailsdev Date: Fri, 16 Dec 2022 19:12:42 -0500 Subject: [PATCH 4/6] Add comment as to why code has been disabled --- .../workers/config/ActivityBeanFactory.java | 5 ++- .../temporal/sync/SyncWorkflowImpl.java | 1 + .../temporal/sync/SyncWorkflowTest.java | 32 +++++++++++-------- 3 files changed, 24 insertions(+), 14 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java index c9cd4a008b397..b91b1f38a3c3c 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java @@ -112,7 +112,10 @@ public List syncActivities( final PersistStateActivity persistStateActivity, final NormalizationSummaryCheckActivity normalizationSummaryCheckActivity, final WebhookOperationActivity webhookOperationActivity, - /* final ConfigFetchActivity configFetchActivity, */ + /* + * Temporarily disabled to address OC issue #1210 + * final ConfigFetchActivity configFetchActivity, + */ final RefreshSchemaActivity refreshSchemaActivity) { return List.of(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity, normalizationSummaryCheckActivity, webhookOperationActivity, /* configFetchActivity, */ refreshSchemaActivity); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java index 772c994098160..80ddf4ab706ef 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java @@ -83,6 +83,7 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig, final int autoDetectSchemaVersion = Workflow.getVersion(AUTO_DETECT_SCHEMA_TAG, Workflow.DEFAULT_VERSION, AUTO_DETECT_SCHEMA_VERSION); + // Temporarily disabled to address OC issue #1210 // if (autoDetectSchemaVersion >= AUTO_DETECT_SCHEMA_VERSION) { // final Optional sourceId = configFetchActivity.getSourceId(connectionId); // diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java index ac94875749022..15c4a7b87a9c4 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java @@ -229,8 +229,9 @@ void testSuccess() { verifyNormalize(normalizationActivity, normalizationInput); verifyDbtTransform(dbtTransformationActivity, syncInput.getResourceRequirements(), operatorDbtInput); -// verifyShouldRefreshSchema(refreshSchemaActivity); -// verifyRefreshSchema(refreshSchemaActivity, sync); + // Temporarily disabled to address OC issue #1210 + // verifyShouldRefreshSchema(refreshSchemaActivity); + // verifyRefreshSchema(refreshSchemaActivity, sync); assertEquals( replicationSuccessOutput.withNormalizationSummary(normalizationSummary).getStandardSyncSummary(), actualOutput.getStandardSyncSummary()); @@ -246,8 +247,9 @@ void testReplicationFailure() { assertThrows(WorkflowFailedException.class, this::execute); -// verifyShouldRefreshSchema(refreshSchemaActivity); -// verifyRefreshSchema(refreshSchemaActivity, sync); + // Temporarily disabled to address OC issue #1210 + // verifyShouldRefreshSchema(refreshSchemaActivity); + // verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyNoInteractions(persistStateActivity); verifyNoInteractions(normalizationActivity); @@ -269,8 +271,9 @@ void testReplicationFailedGracefully() { final StandardSyncOutput actualOutput = execute(); -// verifyShouldRefreshSchema(refreshSchemaActivity); -// verifyRefreshSchema(refreshSchemaActivity, sync); + // Temporarily disabled to address OC issue #1210 + // verifyShouldRefreshSchema(refreshSchemaActivity); + // verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyPersistState(persistStateActivity, sync, replicationFailOutput, syncInput.getCatalog()); verifyNormalize(normalizationActivity, normalizationInput); @@ -296,8 +299,9 @@ void testNormalizationFailure() { assertThrows(WorkflowFailedException.class, this::execute); -// verifyShouldRefreshSchema(refreshSchemaActivity); -// verifyRefreshSchema(refreshSchemaActivity, sync); + // Temporarily disabled to address OC issue #1210 + // verifyShouldRefreshSchema(refreshSchemaActivity); + // verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyPersistState(persistStateActivity, sync, replicationSuccessOutput, syncInput.getCatalog()); verifyNormalize(normalizationActivity, normalizationInput); @@ -317,8 +321,9 @@ void testCancelDuringReplication() { assertThrows(WorkflowFailedException.class, this::execute); -// verifyShouldRefreshSchema(refreshSchemaActivity); -// verifyRefreshSchema(refreshSchemaActivity, sync); + // Temporarily disabled to address OC issue #1210 + // verifyShouldRefreshSchema(refreshSchemaActivity); + // verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyNoInteractions(persistStateActivity); verifyNoInteractions(normalizationActivity); @@ -343,8 +348,9 @@ void testCancelDuringNormalization() { assertThrows(WorkflowFailedException.class, this::execute); -// verifyShouldRefreshSchema(refreshSchemaActivity); -// verifyRefreshSchema(refreshSchemaActivity, sync); + // Temporarily disabled to address OC issue #1210 + // verifyShouldRefreshSchema(refreshSchemaActivity); + // verifyRefreshSchema(refreshSchemaActivity, sync); verifyReplication(replicationActivity, syncInput); verifyPersistState(persistStateActivity, sync, replicationSuccessOutput, syncInput.getCatalog()); verifyNormalize(normalizationActivity, normalizationInput); @@ -397,7 +403,7 @@ void testWebhookOperation() { } @Test - @Disabled + @Disabled("Temporarily disabled to address OC issue #1210") void testSkipReplicationAfterRefreshSchema() { when(configFetchActivity.getStatus(any())).thenReturn(Optional.of(Status.INACTIVE)); final StandardSyncOutput output = execute(); From ebf83a7ddd784202b0063b34ce2c24c29056183b Mon Sep 17 00:00:00 2001 From: jdpgrailsdev Date: Fri, 16 Dec 2022 19:27:52 -0500 Subject: [PATCH 5/6] Fix PMD warnings --- .../workers/config/ActivityBeanFactory.java | 3 +- .../temporal/sync/SyncWorkflowImpl.java | 48 +++++++++---------- 2 files changed, 25 insertions(+), 26 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java index b91b1f38a3c3c..5a5a55332009d 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java @@ -113,8 +113,7 @@ public List syncActivities( final NormalizationSummaryCheckActivity normalizationSummaryCheckActivity, final WebhookOperationActivity webhookOperationActivity, /* - * Temporarily disabled to address OC issue #1210 - * final ConfigFetchActivity configFetchActivity, + * Temporarily disabled to address OC issue #1210 final ConfigFetchActivity configFetchActivity, */ final RefreshSchemaActivity refreshSchemaActivity) { return List.of(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity, normalizationSummaryCheckActivity, diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java index 80ddf4ab706ef..1817dba91c1bd 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java @@ -27,7 +27,6 @@ import io.airbyte.persistence.job.models.JobRunConfig; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.workers.temporal.annotations.TemporalActivityStub; -import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity; import io.temporal.workflow.Workflow; import java.util.Map; import java.util.Optional; @@ -58,10 +57,11 @@ public class SyncWorkflowImpl implements SyncWorkflow { private NormalizationSummaryCheckActivity normalizationSummaryCheckActivity; @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") private WebhookOperationActivity webhookOperationActivity; - @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") - private RefreshSchemaActivity refreshSchemaActivity; - @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") - private ConfigFetchActivity configFetchActivity; + // Temporarily disabled to address OC issue #1210 + // @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") + // private RefreshSchemaActivity refreshSchemaActivity; + // @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") + // private ConfigFetchActivity configFetchActivity; @Trace(operationName = WORKFLOW_TRACE_OPERATION_NAME) @Override @@ -83,25 +83,25 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig, final int autoDetectSchemaVersion = Workflow.getVersion(AUTO_DETECT_SCHEMA_TAG, Workflow.DEFAULT_VERSION, AUTO_DETECT_SCHEMA_VERSION); - // Temporarily disabled to address OC issue #1210 - // if (autoDetectSchemaVersion >= AUTO_DETECT_SCHEMA_VERSION) { - // final Optional sourceId = configFetchActivity.getSourceId(connectionId); - // - // if (!sourceId.isEmpty() && refreshSchemaActivity.shouldRefreshSchema(sourceId.get())) { - // LOGGER.info("Refreshing source schema..."); - // refreshSchemaActivity.refreshSchema(sourceId.get(), connectionId); - // } - // - // final Optional status = configFetchActivity.getStatus(connectionId); - // if (!status.isEmpty() && Status.INACTIVE == status.get()) { - // LOGGER.info("Connection is disabled. Cancelling run."); - // final StandardSyncOutput output = - // new StandardSyncOutput() - // .withStandardSyncSummary(new - // StandardSyncSummary().withStatus(ReplicationStatus.CANCELLED).withTotalStats(new SyncStats())); - // return output; - // } - // } + if (autoDetectSchemaVersion >= AUTO_DETECT_SCHEMA_VERSION) { + // Temporarily disabled to address OC issue #1210 + // final Optional sourceId = configFetchActivity.getSourceId(connectionId); + // + // if (!sourceId.isEmpty() && refreshSchemaActivity.shouldRefreshSchema(sourceId.get())) { + // LOGGER.info("Refreshing source schema..."); + // refreshSchemaActivity.refreshSchema(sourceId.get(), connectionId); + // } + // + // final Optional status = configFetchActivity.getStatus(connectionId); + // if (!status.isEmpty() && Status.INACTIVE == status.get()) { + // LOGGER.info("Connection is disabled. Cancelling run."); + // final StandardSyncOutput output = + // new StandardSyncOutput() + // .withStandardSyncSummary(new + // StandardSyncSummary().withStatus(ReplicationStatus.CANCELLED).withTotalStats(new SyncStats())); + // return output; + // } + } StandardSyncOutput syncOutput = replicationActivity.replicate(jobRunConfig, sourceLauncherConfig, destinationLauncherConfig, syncInput, taskQueue); From e6249981465b19833de100a913f5687edd60cdf8 Mon Sep 17 00:00:00 2001 From: jdpgrailsdev Date: Fri, 16 Dec 2022 19:31:33 -0500 Subject: [PATCH 6/6] Fix PMD warning --- .../temporal/sync/SyncWorkflowImpl.java | 40 ++++++++++--------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java index 1817dba91c1bd..5c4c19245ed73 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java @@ -80,28 +80,30 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig, final int version = Workflow.getVersion(VERSION_LABEL, Workflow.DEFAULT_VERSION, CURRENT_VERSION); final String taskQueue = Workflow.getInfo().getTaskQueue(); + // Temporarily suppressed to address OC issue #1210 + @SuppressWarnings("PMD.UnusedLocalVariable") final int autoDetectSchemaVersion = Workflow.getVersion(AUTO_DETECT_SCHEMA_TAG, Workflow.DEFAULT_VERSION, AUTO_DETECT_SCHEMA_VERSION); - if (autoDetectSchemaVersion >= AUTO_DETECT_SCHEMA_VERSION) { - // Temporarily disabled to address OC issue #1210 - // final Optional sourceId = configFetchActivity.getSourceId(connectionId); - // - // if (!sourceId.isEmpty() && refreshSchemaActivity.shouldRefreshSchema(sourceId.get())) { - // LOGGER.info("Refreshing source schema..."); - // refreshSchemaActivity.refreshSchema(sourceId.get(), connectionId); - // } - // - // final Optional status = configFetchActivity.getStatus(connectionId); - // if (!status.isEmpty() && Status.INACTIVE == status.get()) { - // LOGGER.info("Connection is disabled. Cancelling run."); - // final StandardSyncOutput output = - // new StandardSyncOutput() - // .withStandardSyncSummary(new - // StandardSyncSummary().withStatus(ReplicationStatus.CANCELLED).withTotalStats(new SyncStats())); - // return output; - // } - } + // Temporarily disabled to address OC issue #1210 + // if (autoDetectSchemaVersion >= AUTO_DETECT_SCHEMA_VERSION) { + // final Optional sourceId = configFetchActivity.getSourceId(connectionId); + // + // if (!sourceId.isEmpty() && refreshSchemaActivity.shouldRefreshSchema(sourceId.get())) { + // LOGGER.info("Refreshing source schema..."); + // refreshSchemaActivity.refreshSchema(sourceId.get(), connectionId); + // } + // + // final Optional status = configFetchActivity.getStatus(connectionId); + // if (!status.isEmpty() && Status.INACTIVE == status.get()) { + // LOGGER.info("Connection is disabled. Cancelling run."); + // final StandardSyncOutput output = + // new StandardSyncOutput() + // .withStandardSyncSummary(new + // StandardSyncSummary().withStatus(ReplicationStatus.CANCELLED).withTotalStats(new SyncStats())); + // return output; + // } + // } StandardSyncOutput syncOutput = replicationActivity.replicate(jobRunConfig, sourceLauncherConfig, destinationLauncherConfig, syncInput, taskQueue);