From 1a25de26410a9ad60d01d42401e8992cc06a6f3b Mon Sep 17 00:00:00 2001 From: Cole Snodgrass Date: Tue, 10 Jan 2023 08:59:02 -0800 Subject: [PATCH] convert EnvVariableFeatureFlag to an injected dependency (#21171) * inject feature-flag client into integration-launcher * inject feature-flag client into message-tracker * inject feature-flag client into DefaultAirbyteSource * missed reference in message-tracker --- .../internal/DefaultAirbyteSource.java | 20 ++++++++----- .../book_keeping/AirbyteMessageTracker.java | 17 ++++++----- .../process/AirbyteIntegrationLauncher.java | 7 +++-- .../ReplicationWorkerPerformanceTest.java | 3 +- .../internal/DefaultAirbyteSourceTest.java | 14 +++++---- .../AirbyteMessageTrackerTest.java | 3 +- .../AirbyteIntegrationLauncherTest.java | 14 +++++---- .../commons/features/FeatureFlags.java | 2 +- .../ReplicationJobOrchestrator.java | 10 ++++--- .../DestinationAcceptanceTest.java | 9 +++--- .../source/AbstractSourceConnectorTest.java | 29 +++++++++++++------ .../CheckConnectionActivityImpl.java | 9 ++++-- .../catalog/DiscoverCatalogActivityImpl.java | 12 +++++--- .../temporal/spec/SpecActivityImpl.java | 9 ++++-- .../sync/ReplicationActivityImpl.java | 17 +++++++---- 15 files changed, 113 insertions(+), 62 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/DefaultAirbyteSource.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/DefaultAirbyteSource.java index f834c33107fd..945306c8d209 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/DefaultAirbyteSource.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/DefaultAirbyteSource.java @@ -9,7 +9,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import datadog.trace.api.Trace; -import io.airbyte.commons.features.EnvVariableFeatureFlags; +import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.io.IOs; import io.airbyte.commons.io.LineGobbler; import io.airbyte.commons.json.Jsons; @@ -56,23 +56,27 @@ public class DefaultAirbyteSource implements AirbyteSource { private Process sourceProcess = null; private Iterator messageIterator = null; private Integer exitValue = null; - private final boolean logConnectorMessages = new EnvVariableFeatureFlags().logConnectorMessages(); + private final FeatureFlags featureFlags; - public DefaultAirbyteSource(final IntegrationLauncher integrationLauncher) { - this(integrationLauncher, new DefaultAirbyteStreamFactory(CONTAINER_LOG_MDC_BUILDER)); + public DefaultAirbyteSource(final IntegrationLauncher integrationLauncher, final FeatureFlags featureFlags) { + this(integrationLauncher, new DefaultAirbyteStreamFactory(CONTAINER_LOG_MDC_BUILDER), featureFlags); } - public DefaultAirbyteSource(final IntegrationLauncher integrationLauncher, final AirbyteStreamFactory streamFactory) { - this(integrationLauncher, streamFactory, new HeartbeatMonitor(HEARTBEAT_FRESH_DURATION)); + public DefaultAirbyteSource(final IntegrationLauncher integrationLauncher, + final AirbyteStreamFactory streamFactory, + final FeatureFlags featureFlags) { + this(integrationLauncher, streamFactory, new HeartbeatMonitor(HEARTBEAT_FRESH_DURATION), featureFlags); } @VisibleForTesting DefaultAirbyteSource(final IntegrationLauncher integrationLauncher, final AirbyteStreamFactory streamFactory, - final HeartbeatMonitor heartbeatMonitor) { + final HeartbeatMonitor heartbeatMonitor, + final FeatureFlags featureFlags) { this.integrationLauncher = integrationLauncher; this.streamFactory = streamFactory; this.heartbeatMonitor = heartbeatMonitor; + this.featureFlags = featureFlags; } @Trace(operationName = WORKER_OPERATION_NAME) @@ -167,7 +171,7 @@ public void cancel() throws Exception { } private void logInitialStateAsJSON(final WorkerSourceConfig sourceConfig) { - if (!logConnectorMessages) { + if (!featureFlags.logConnectorMessages()) { return; } diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/book_keeping/AirbyteMessageTracker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/book_keeping/AirbyteMessageTracker.java index 68399dff3860..794a3ff18f04 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/book_keeping/AirbyteMessageTracker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/book_keeping/AirbyteMessageTracker.java @@ -14,7 +14,7 @@ import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; import datadog.trace.api.Trace; -import io.airbyte.commons.features.EnvVariableFeatureFlags; +import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.json.Jsons; import io.airbyte.config.FailureReason; import io.airbyte.config.State; @@ -66,7 +66,7 @@ public class AirbyteMessageTracker implements MessageTracker { private final List destinationErrorTraceMessages; private final List sourceErrorTraceMessages; private final StateAggregator stateAggregator; - private final boolean logConnectorMessages = new EnvVariableFeatureFlags().logConnectorMessages(); + private final FeatureFlags featureFlags; // These variables support SYNC level estimates and are meant for sources where stream level // estimates are not possible e.g. CDC sources. @@ -91,16 +91,18 @@ private enum ConnectorType { DESTINATION } - public AirbyteMessageTracker() { + public AirbyteMessageTracker(final FeatureFlags featureFlags) { this(new StateDeltaTracker(STATE_DELTA_TRACKER_MEMORY_LIMIT_BYTES), - new DefaultStateAggregator(new EnvVariableFeatureFlags().useStreamCapableState()), - new StateMetricsTracker(STATE_METRICS_TRACKER_MESSAGE_LIMIT)); + new DefaultStateAggregator(featureFlags.useStreamCapableState()), + new StateMetricsTracker(STATE_METRICS_TRACKER_MESSAGE_LIMIT), + featureFlags); } @VisibleForTesting protected AirbyteMessageTracker(final StateDeltaTracker stateDeltaTracker, final StateAggregator stateAggregator, - final StateMetricsTracker stateMetricsTracker) { + final StateMetricsTracker stateMetricsTracker, + final FeatureFlags featureFlags) { this.sourceOutputState = new AtomicReference<>(); this.destinationOutputState = new AtomicReference<>(); this.streamToRunningCount = new HashMap<>(); @@ -115,6 +117,7 @@ protected AirbyteMessageTracker(final StateDeltaTracker stateDeltaTracker, this.destinationErrorTraceMessages = new ArrayList<>(); this.sourceErrorTraceMessages = new ArrayList<>(); this.stateAggregator = stateAggregator; + this.featureFlags = featureFlags; } @Trace(operationName = WORKER_OPERATION_NAME) @@ -542,7 +545,7 @@ public Boolean getUnreliableStateTimingMetrics() { } private void logMessageAsJSON(final String caller, final AirbyteMessage message) { - if (!logConnectorMessages) { + if (!featureFlags.logConnectorMessages()) { return; } diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java index 43393e2b03ad..6b8c9ac347cd 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java @@ -46,7 +46,7 @@ public class AirbyteIntegrationLauncher implements IntegrationLauncher { /** * If true, launcher will use a separated isolated pool to run the job. - * + *

* At this moment, we put custom connector jobs into an isolated pool. */ private final boolean useIsolatedPool; @@ -56,13 +56,14 @@ public AirbyteIntegrationLauncher(final String jobId, final String imageName, final ProcessFactory processFactory, final ResourceRequirements resourceRequirement, - final boolean useIsolatedPool) { + final boolean useIsolatedPool, + final FeatureFlags featureFlags) { this.jobId = jobId; this.attempt = attempt; this.imageName = imageName; this.processFactory = processFactory; this.resourceRequirement = resourceRequirement; - this.featureFlags = new EnvVariableFeatureFlags(); + this.featureFlags = featureFlags; this.useIsolatedPool = useIsolatedPool; } diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java index 860acbec2315..ac6a139aef57 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerPerformanceTest.java @@ -4,6 +4,7 @@ package io.airbyte.workers.general; +import io.airbyte.commons.features.EnvVariableFeatureFlags; import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType; import io.airbyte.config.ReplicationOutput; import io.airbyte.config.StandardSyncInput; @@ -65,7 +66,7 @@ public class ReplicationWorkerPerformanceTest { public void executeOneSync() throws InterruptedException { final var perSource = new LimitedAirbyteSource(); final var perDestination = new EmptyAirbyteDestination(); - final var messageTracker = new AirbyteMessageTracker(); + final var messageTracker = new AirbyteMessageTracker(new EnvVariableFeatureFlags()); final var connectorConfigUpdater = Mockito.mock(ConnectorConfigUpdater.class); final var metricReporter = new WorkerMetricReporter(new NotImplementedMetricClient(), "test-image:0.01"); final var dstNamespaceMapper = new NamespacingMapper(NamespaceDefinitionType.DESTINATION, "", ""); diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/DefaultAirbyteSourceTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/DefaultAirbyteSourceTest.java index 76e1eba9843c..87198e57a35b 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/DefaultAirbyteSourceTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/DefaultAirbyteSourceTest.java @@ -17,6 +17,8 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import io.airbyte.commons.features.EnvVariableFeatureFlags; +import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.logging.LoggingHelper.Color; @@ -80,6 +82,8 @@ class DefaultAirbyteSourceTest { private static Path logJobRoot; + private static final FeatureFlags featureFlags = new EnvVariableFeatureFlags(); + static { try { logJobRoot = Files.createTempDirectory(Path.of("/tmp"), "mdc_test"); @@ -137,7 +141,7 @@ void testSuccessfulLifecycle() throws Exception { when(heartbeatMonitor.isBeating()).thenReturn(true).thenReturn(false); - final AirbyteSource source = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor); + final AirbyteSource source = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor, featureFlags); source.start(SOURCE_CONFIG, jobRoot); final List messages = Lists.newArrayList(); @@ -173,7 +177,7 @@ void testTaggedLogs() throws Exception { when(heartbeatMonitor.isBeating()).thenReturn(true).thenReturn(false); final AirbyteSource source = new DefaultAirbyteSource(integrationLauncher, streamFactory, - heartbeatMonitor); + heartbeatMonitor, featureFlags); source.start(SOURCE_CONFIG, jobRoot); final List messages = Lists.newArrayList(); @@ -198,7 +202,7 @@ void testTaggedLogs() throws Exception { @Test void testNonzeroExitCodeThrows() throws Exception { - final AirbyteSource tap = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor); + final AirbyteSource tap = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor, featureFlags); tap.start(SOURCE_CONFIG, jobRoot); when(process.exitValue()).thenReturn(1); @@ -208,7 +212,7 @@ void testNonzeroExitCodeThrows() throws Exception { @Test void testIgnoredExitCodes() throws Exception { - final AirbyteSource tap = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor); + final AirbyteSource tap = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor, featureFlags); tap.start(SOURCE_CONFIG, jobRoot); when(process.isAlive()).thenReturn(false); @@ -220,7 +224,7 @@ void testIgnoredExitCodes() throws Exception { @Test void testGetExitValue() throws Exception { - final AirbyteSource source = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor); + final AirbyteSource source = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor, featureFlags); source.start(SOURCE_CONFIG, jobRoot); when(process.isAlive()).thenReturn(false); diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/book_keeping/AirbyteMessageTrackerTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/book_keeping/AirbyteMessageTrackerTest.java index 8d6871ede3d1..1742d5512c9a 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/book_keeping/AirbyteMessageTrackerTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/book_keeping/AirbyteMessageTrackerTest.java @@ -9,6 +9,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import io.airbyte.commons.features.EnvVariableFeatureFlags; import io.airbyte.commons.json.Jsons; import io.airbyte.config.FailureReason; import io.airbyte.config.State; @@ -49,7 +50,7 @@ class AirbyteMessageTrackerTest { @BeforeEach void setup() { final StateMetricsTracker stateMetricsTracker = new StateMetricsTracker(10L * 1024L * 1024L); - this.messageTracker = new AirbyteMessageTracker(mStateDeltaTracker, mStateAggregator, stateMetricsTracker); + this.messageTracker = new AirbyteMessageTracker(mStateDeltaTracker, mStateAggregator, stateMetricsTracker, new EnvVariableFeatureFlags()); } @Test diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java index 8057446fa48b..458885c0d940 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import io.airbyte.commons.features.EnvVariableFeatureFlags; +import io.airbyte.commons.features.FeatureFlags; import io.airbyte.config.EnvConfigs; import io.airbyte.config.WorkerEnvConstants; import io.airbyte.workers.WorkerConfigs; @@ -49,14 +50,16 @@ class AirbyteIntegrationLauncherTest { CONFIG, "{}", CATALOG, "{}", "state", "{}"); + + private static final FeatureFlags featureFlags = new EnvVariableFeatureFlags(); private static final Map JOB_METADATA = Map.of( WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, FAKE_IMAGE, WorkerEnvConstants.WORKER_JOB_ID, JOB_ID, WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(JOB_ATTEMPT), - EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(new EnvVariableFeatureFlags().useStreamCapableState()), - EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(new EnvVariableFeatureFlags().autoDetectSchema()), - EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(new EnvVariableFeatureFlags().applyFieldSelection()), - EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, new EnvVariableFeatureFlags().fieldSelectionWorkspaces()); + EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(featureFlags.useStreamCapableState()), + EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(featureFlags.autoDetectSchema()), + EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(featureFlags.applyFieldSelection()), + EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, featureFlags.fieldSelectionWorkspaces()); private WorkerConfigs workerConfigs; @Mock @@ -66,7 +69,8 @@ class AirbyteIntegrationLauncherTest { @BeforeEach void setUp() { workerConfigs = new WorkerConfigs(new EnvConfigs()); - launcher = new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, FAKE_IMAGE, processFactory, workerConfigs.getResourceRequirements(), false); + launcher = new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, FAKE_IMAGE, processFactory, workerConfigs.getResourceRequirements(), false, + featureFlags); } @Test diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java index aa1858acda1e..60a40c89ded0 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java @@ -5,7 +5,7 @@ package io.airbyte.commons.features; /** - * Interface that describe which features are activated in airbyte. Currently the only + * Interface that describe which features are activated in airbyte. Currently, the only * implementation relies on env. Ideally it should be on some DB. */ public interface FeatureFlags { diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java index e804ecb17adf..5ba9ab5944c7 100644 --- a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java +++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java @@ -123,7 +123,8 @@ public Optional runJob() throws Exception { sourceLauncherConfig.getDockerImage(), processFactory, syncInput.getSourceResourceRequirements(), - useIsolatedPool); + useIsolatedPool, + featureFlags); log.info("Setting up destination launcher..."); final var destinationLauncher = new AirbyteIntegrationLauncher( @@ -132,7 +133,8 @@ public Optional runJob() throws Exception { destinationLauncherConfig.getDockerImage(), processFactory, syncInput.getDestinationResourceRequirements(), - useIsolatedPool); + useIsolatedPool, + featureFlags); log.info("Setting up source..."); // reset jobs use an empty source to induce resetting all data in destination. @@ -140,7 +142,7 @@ public Optional runJob() throws Exception { WorkerConstants.RESET_JOB_SOURCE_DOCKER_IMAGE_STUB.equals(sourceLauncherConfig.getDockerImage()) ? new EmptyAirbyteSource( featureFlags.useStreamCapableState()) : new DefaultAirbyteSource(sourceLauncher, - getStreamFactory(sourceLauncherConfig.getProtocolVersion(), DefaultAirbyteSource.CONTAINER_LOG_MDC_BUILDER)); + getStreamFactory(sourceLauncherConfig.getProtocolVersion(), DefaultAirbyteSource.CONTAINER_LOG_MDC_BUILDER), featureFlags); MetricClientFactory.initialize(MetricEmittingApps.WORKER); final var metricClient = MetricClientFactory.getMetricClient(); @@ -156,7 +158,7 @@ public Optional runJob() throws Exception { new DefaultAirbyteDestination(destinationLauncher, getStreamFactory(destinationLauncherConfig.getProtocolVersion(), DefaultAirbyteDestination.CONTAINER_LOG_MDC_BUILDER), new VersionedAirbyteMessageBufferedWriterFactory(serDeProvider, migratorFactory, destinationLauncherConfig.getProtocolVersion())), - new AirbyteMessageTracker(), + new AirbyteMessageTracker(featureFlags), new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)), metricReporter, new ConnectorConfigUpdater(sourceApi, destinationApi), diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java index 41dd24b41a36..ed15b8266f0f 100644 --- a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java +++ b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/DestinationAcceptanceTest.java @@ -19,6 +19,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import io.airbyte.commons.features.EnvVariableFeatureFlags; import io.airbyte.commons.jackson.MoreMappers; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.Exceptions; @@ -1270,14 +1271,14 @@ protected void assertNamespaceNormalization(final String testCaseId, private ConnectorSpecification runSpec() throws WorkerException { return convertProtocolObject( new DefaultGetSpecWorker( - new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, false)) + new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, false, new EnvVariableFeatureFlags())) .run(new JobGetSpecConfig().withDockerImage(getImageName()), jobRoot).getSpec(), ConnectorSpecification.class); } protected StandardCheckConnectionOutput runCheck(final JsonNode config) throws WorkerException { return new DefaultCheckConnectionWorker( - new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, false), + new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, false, new EnvVariableFeatureFlags()), mConnectorConfigUpdater) .run(new StandardCheckConnectionInput().withConnectionConfiguration(config), jobRoot) .getCheckConnection(); @@ -1287,7 +1288,7 @@ protected StandardCheckConnectionOutput.Status runCheckWithCatchedException( final JsonNode config) { try { final StandardCheckConnectionOutput standardCheckConnectionOutput = new DefaultCheckConnectionWorker( - new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, false), + new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, false, new EnvVariableFeatureFlags()), mConnectorConfigUpdater) .run(new StandardCheckConnectionInput().withConnectionConfiguration(config), jobRoot) .getCheckConnection(); @@ -1300,7 +1301,7 @@ protected StandardCheckConnectionOutput.Status runCheckWithCatchedException( protected AirbyteDestination getDestination() { return new DefaultAirbyteDestination( - new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, false)); + new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, false, new EnvVariableFeatureFlags())); } protected void runSyncAndVerifyStateOutput(final JsonNode config, diff --git a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceConnectorTest.java b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceConnectorTest.java index e95f2ec7dc2b..f3d1ae3598bc 100644 --- a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceConnectorTest.java +++ b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceConnectorTest.java @@ -11,6 +11,7 @@ import static org.mockito.Mockito.verify; import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.features.EnvVariableFeatureFlags; import io.airbyte.commons.json.Jsons; import io.airbyte.config.EnvConfigs; import io.airbyte.config.JobGetSpecConfig; @@ -157,21 +158,24 @@ public void tearDownInternal() throws Exception { protected ConnectorSpecification runSpec() throws WorkerException { final io.airbyte.protocol.models.ConnectorSpecification spec = new DefaultGetSpecWorker( - new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, workerConfigs.getResourceRequirements(), false)) - .run(new JobGetSpecConfig().withDockerImage(getImageName()), jobRoot).getSpec(); + new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, workerConfigs.getResourceRequirements(), false, + new EnvVariableFeatureFlags())) + .run(new JobGetSpecConfig().withDockerImage(getImageName()), jobRoot).getSpec(); return convertProtocolObject(spec, ConnectorSpecification.class); } protected StandardCheckConnectionOutput runCheck() throws Exception { return new DefaultCheckConnectionWorker( - new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, workerConfigs.getResourceRequirements(), false), + new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, workerConfigs.getResourceRequirements(), false, + new EnvVariableFeatureFlags()), mConnectorConfigUpdater) .run(new StandardCheckConnectionInput().withConnectionConfiguration(getConfig()), jobRoot).getCheckConnection(); } protected String runCheckAndGetStatusAsString(final JsonNode config) throws Exception { return new DefaultCheckConnectionWorker( - new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, workerConfigs.getResourceRequirements(), false), + new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, workerConfigs.getResourceRequirements(), false, + new EnvVariableFeatureFlags()), mConnectorConfigUpdater) .run(new StandardCheckConnectionInput().withConnectionConfiguration(config), jobRoot).getCheckConnection().getStatus().toString(); } @@ -179,7 +183,8 @@ protected String runCheckAndGetStatusAsString(final JsonNode config) throws Exce protected UUID runDiscover() throws Exception { final UUID toReturn = new DefaultDiscoverCatalogWorker( mConfigRepository, - new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, workerConfigs.getResourceRequirements(), false), + new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, workerConfigs.getResourceRequirements(), false, + new EnvVariableFeatureFlags()), mConnectorConfigUpdater) .run(new StandardDiscoverCatalogInput().withSourceId(SOURCE_ID.toString()).withConnectionConfiguration(getConfig()), jobRoot) .getDiscoverCatalogId(); @@ -210,8 +215,12 @@ protected List runRead(final ConfiguredAirbyteCatalog catalog, f .withState(state == null ? null : new State().withState(state)) .withCatalog(convertProtocolObject(catalog, io.airbyte.protocol.models.ConfiguredAirbyteCatalog.class)); + final var featureFlags = new EnvVariableFeatureFlags(); + final AirbyteSource source = new DefaultAirbyteSource( - new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, workerConfigs.getResourceRequirements(), false)); + new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, workerConfigs.getResourceRequirements(), false, + featureFlags), + featureFlags); final List messages = new ArrayList<>(); source.start(sourceConfig, jobRoot); while (!source.isFinished()) { @@ -260,10 +269,12 @@ protected ResourceRequirements prepareResourceRequirements(final Map prepareResourceRequestMapBySystemProperties() { diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java index 8447ec40c775..7040ffba476f 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java @@ -12,6 +12,7 @@ import com.fasterxml.jackson.databind.JsonNode; import datadog.trace.api.Trace; import io.airbyte.api.client.AirbyteApiClient; +import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.functional.CheckedSupplier; import io.airbyte.commons.protocol.AirbyteMessageSerDeProvider; import io.airbyte.commons.protocol.AirbyteMessageVersionedMigratorFactory; @@ -60,6 +61,7 @@ public class CheckConnectionActivityImpl implements CheckConnectionActivity { private final String airbyteVersion; private final AirbyteMessageSerDeProvider serDeProvider; private final AirbyteMessageVersionedMigratorFactory migratorFactory; + private final FeatureFlags featureFlags; public CheckConnectionActivityImpl(@Named("checkWorkerConfigs") final WorkerConfigs workerConfigs, @Named("checkProcessFactory") final ProcessFactory processFactory, @@ -70,7 +72,8 @@ public CheckConnectionActivityImpl(@Named("checkWorkerConfigs") final WorkerConf final AirbyteApiClient airbyteApiClient, @Value("${airbyte.version}") final String airbyteVersion, final AirbyteMessageSerDeProvider serDeProvider, - final AirbyteMessageVersionedMigratorFactory migratorFactory) { + final AirbyteMessageVersionedMigratorFactory migratorFactory, + final FeatureFlags featureFlags) { this.workerConfigs = workerConfigs; this.processFactory = processFactory; this.workspaceRoot = workspaceRoot; @@ -81,6 +84,7 @@ public CheckConnectionActivityImpl(@Named("checkWorkerConfigs") final WorkerConf this.airbyteVersion = airbyteVersion; this.serDeProvider = serDeProvider; this.migratorFactory = migratorFactory; + this.featureFlags = featureFlags; } @Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME) @@ -135,7 +139,8 @@ private CheckedSupplier launcherConfig.getDockerImage(), processFactory, workerConfigs.getResourceRequirements(), - launcherConfig.getIsCustomConnector()); + launcherConfig.getIsCustomConnector(), + featureFlags); final ConnectorConfigUpdater connectorConfigUpdater = new ConnectorConfigUpdater( airbyteApiClient.getSourceApi(), diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java index 2e02e9de80e5..cd1304a7fe8f 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java @@ -12,6 +12,7 @@ import com.fasterxml.jackson.databind.JsonNode; import datadog.trace.api.Trace; import io.airbyte.api.client.AirbyteApiClient; +import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.functional.CheckedSupplier; import io.airbyte.commons.protocol.AirbyteMessageSerDeProvider; import io.airbyte.commons.protocol.AirbyteMessageVersionedMigratorFactory; @@ -59,10 +60,10 @@ public class DiscoverCatalogActivityImpl implements DiscoverCatalogActivity { private final LogConfigs logConfigs; private final AirbyteApiClient airbyteApiClient; private final String airbyteVersion; - private final ConfigRepository configRepository; private final AirbyteMessageSerDeProvider serDeProvider; private final AirbyteMessageVersionedMigratorFactory migratorFactory; + private final FeatureFlags featureFlags; public DiscoverCatalogActivityImpl(@Named("discoverWorkerConfigs") final WorkerConfigs workerConfigs, @Named("discoverProcessFactory") final ProcessFactory processFactory, @@ -74,7 +75,8 @@ public DiscoverCatalogActivityImpl(@Named("discoverWorkerConfigs") final WorkerC final AirbyteApiClient airbyteApiClient, @Value("${airbyte.version}") final String airbyteVersion, final AirbyteMessageSerDeProvider serDeProvider, - final AirbyteMessageVersionedMigratorFactory migratorFactory) { + final AirbyteMessageVersionedMigratorFactory migratorFactory, + final FeatureFlags featureFlags) { this.configRepository = configRepository; this.workerConfigs = workerConfigs; this.processFactory = processFactory; @@ -86,6 +88,7 @@ public DiscoverCatalogActivityImpl(@Named("discoverWorkerConfigs") final WorkerC this.airbyteVersion = airbyteVersion; this.serDeProvider = serDeProvider; this.migratorFactory = migratorFactory; + this.featureFlags = featureFlags; } @Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME) @@ -121,11 +124,12 @@ public ConnectorJobOutput run(final JobRunConfig jobRunConfig, return temporalAttemptExecution.get(); } - private CheckedSupplier, Exception> getWorkerFactory(final IntegrationLauncherConfig launcherConfig) { + private CheckedSupplier, Exception> getWorkerFactory( + final IntegrationLauncherConfig launcherConfig) { return () -> { final IntegrationLauncher integrationLauncher = new AirbyteIntegrationLauncher(launcherConfig.getJobId(), launcherConfig.getAttemptId().intValue(), launcherConfig.getDockerImage(), - processFactory, workerConfigs.getResourceRequirements(), launcherConfig.getIsCustomConnector()); + processFactory, workerConfigs.getResourceRequirements(), launcherConfig.getIsCustomConnector(), featureFlags); final AirbyteStreamFactory streamFactory = new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, launcherConfig.getProtocolVersion()); final ConnectorConfigUpdater connectorConfigUpdater = diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java index 280196052ddb..5b7b8ffc82fa 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java @@ -11,6 +11,7 @@ import datadog.trace.api.Trace; import io.airbyte.api.client.AirbyteApiClient; +import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.functional.CheckedSupplier; import io.airbyte.commons.protocol.AirbyteMessageSerDeProvider; import io.airbyte.commons.protocol.AirbyteMessageVersionedMigratorFactory; @@ -56,6 +57,7 @@ public class SpecActivityImpl implements SpecActivity { private final String airbyteVersion; private final AirbyteMessageSerDeProvider serDeProvider; private final AirbyteMessageVersionedMigratorFactory migratorFactory; + private final FeatureFlags featureFlags; public SpecActivityImpl(@Named("specWorkerConfigs") final WorkerConfigs workerConfigs, @Named("specProcessFactory") final ProcessFactory processFactory, @@ -65,7 +67,8 @@ public SpecActivityImpl(@Named("specWorkerConfigs") final WorkerConfigs workerCo final AirbyteApiClient airbyteApiClient, @Value("${airbyte.version}") final String airbyteVersion, final AirbyteMessageSerDeProvider serDeProvider, - final AirbyteMessageVersionedMigratorFactory migratorFactory) { + final AirbyteMessageVersionedMigratorFactory migratorFactory, + final FeatureFlags featureFlags) { this.workerConfigs = workerConfigs; this.processFactory = processFactory; this.workspaceRoot = workspaceRoot; @@ -75,6 +78,7 @@ public SpecActivityImpl(@Named("specWorkerConfigs") final WorkerConfigs workerCo this.airbyteVersion = airbyteVersion; this.serDeProvider = serDeProvider; this.migratorFactory = migratorFactory; + this.featureFlags = featureFlags; } @Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME) @@ -113,7 +117,8 @@ private CheckedSupplier, Exception> launcherConfig.getDockerImage(), processFactory, workerConfigs.getResourceRequirements(), - launcherConfig.getIsCustomConnector()); + launcherConfig.getIsCustomConnector(), + featureFlags); return new DefaultGetSpecWorker(integrationLauncher, streamFactory); }; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java index 820110a3eeed..9482a2e2d065 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java @@ -265,7 +265,8 @@ private static void traceReplicationSummary(final ReplicationAttemptSummary repl } } - private CheckedSupplier, Exception> getLegacyWorkerFactory(final IntegrationLauncherConfig sourceLauncherConfig, + private CheckedSupplier, Exception> getLegacyWorkerFactory( + final IntegrationLauncherConfig sourceLauncherConfig, final IntegrationLauncherConfig destinationLauncherConfig, final JobRunConfig jobRunConfig, final StandardSyncInput syncInput) { @@ -276,21 +277,24 @@ private CheckedSupplier, Exception> sourceLauncherConfig.getDockerImage(), processFactory, syncInput.getSourceResourceRequirements(), - sourceLauncherConfig.getIsCustomConnector()); + sourceLauncherConfig.getIsCustomConnector(), + featureFlags); final IntegrationLauncher destinationLauncher = new AirbyteIntegrationLauncher( destinationLauncherConfig.getJobId(), Math.toIntExact(destinationLauncherConfig.getAttemptId()), destinationLauncherConfig.getDockerImage(), processFactory, syncInput.getDestinationResourceRequirements(), - destinationLauncherConfig.getIsCustomConnector()); + destinationLauncherConfig.getIsCustomConnector(), + featureFlags); // reset jobs use an empty source to induce resetting all data in destination. final AirbyteSource airbyteSource = isResetJob(sourceLauncherConfig.getDockerImage()) ? new EmptyAirbyteSource(featureFlags.useStreamCapableState()) : new DefaultAirbyteSource(sourceLauncher, new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, sourceLauncherConfig.getProtocolVersion(), - DefaultAirbyteSource.CONTAINER_LOG_MDC_BUILDER)); + DefaultAirbyteSource.CONTAINER_LOG_MDC_BUILDER), + featureFlags); MetricClientFactory.initialize(MetricEmittingApps.WORKER); final MetricClient metricClient = MetricClientFactory.getMetricClient(); final WorkerMetricReporter metricReporter = new WorkerMetricReporter(metricClient, sourceLauncherConfig.getDockerImage()); @@ -304,7 +308,7 @@ private CheckedSupplier, Exception> new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, destinationLauncherConfig.getProtocolVersion(), DefaultAirbyteDestination.CONTAINER_LOG_MDC_BUILDER), new VersionedAirbyteMessageBufferedWriterFactory(serDeProvider, migratorFactory, destinationLauncherConfig.getProtocolVersion())), - new AirbyteMessageTracker(), + new AirbyteMessageTracker(featureFlags), new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)), metricReporter, new ConnectorConfigUpdater(airbyteApiClient.getSourceApi(), airbyteApiClient.getDestinationApi()), @@ -312,7 +316,8 @@ private CheckedSupplier, Exception> }; } - private CheckedSupplier, Exception> getContainerLauncherWorkerFactory(final ContainerOrchestratorConfig containerOrchestratorConfig, + private CheckedSupplier, Exception> getContainerLauncherWorkerFactory( + final ContainerOrchestratorConfig containerOrchestratorConfig, final IntegrationLauncherConfig sourceLauncherConfig, final IntegrationLauncherConfig destinationLauncherConfig, final JobRunConfig jobRunConfig,