diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlagHelper.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlagHelper.java new file mode 100644 index 000000000000..1eeefa4828f9 --- /dev/null +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlagHelper.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.commons.features; + +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class FeatureFlagHelper { + + public static boolean isFieldSelectionEnabledForWorkspace(final FeatureFlags featureFlags, final UUID workspaceId) { + final String workspaceIdsString = featureFlags.fieldSelectionWorkspaces(); + final Set workspaceIds = new HashSet<>(); + if (workspaceIdsString != null && !workspaceIdsString.isEmpty()) { + for (final String id : workspaceIdsString.split(",")) { + try { + workspaceIds.add(UUID.fromString(id)); + } catch (final IllegalArgumentException e) { + log.warn("Malformed workspace id for field selection: {}", id); + } + } + } + if (workspaceId != null && workspaceIds.contains(workspaceId)) { + return true; + } + + return featureFlags.applyFieldSelection(); + } + +} diff --git a/airbyte-commons/src/test/java/io/airbyte/commons/features/FeatureFlagHelperTest.java b/airbyte-commons/src/test/java/io/airbyte/commons/features/FeatureFlagHelperTest.java new file mode 100644 index 000000000000..2b82701f8e0e --- /dev/null +++ b/airbyte-commons/src/test/java/io/airbyte/commons/features/FeatureFlagHelperTest.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.commons.features; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.UUID; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class FeatureFlagHelperTest { + + FeatureFlags featureFlags; + + @BeforeEach + void beforeEach() { + featureFlags = mock(FeatureFlags.class); + } + + @Test + void isFieldSelectionEnabledForWorkspaceWithEmptyString() { + when(featureFlags.fieldSelectionWorkspaces()).thenReturn(""); + + assertFalse(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, UUID.randomUUID())); + } + + @Test + void isFieldSelectionEnabledForWorkspaceWithSpaceString() { + when(featureFlags.fieldSelectionWorkspaces()).thenReturn(" "); + + assertFalse(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, UUID.randomUUID())); + } + + @Test + void isFieldSelectionEnabledForWorkspaceWithNullString() { + when(featureFlags.fieldSelectionWorkspaces()).thenReturn(null); + + assertFalse(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, UUID.randomUUID())); + } + + @Test + void isFieldSelectionEnabledForWorkspaceWithSomeIdsAndAMatch() { + final UUID workspaceId = UUID.randomUUID(); + final UUID randomId = UUID.randomUUID(); + when(featureFlags.fieldSelectionWorkspaces()).thenReturn(randomId.toString() + "," + workspaceId.toString()); + + assertTrue(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, workspaceId)); + } + + @Test + void isFieldSelectionEnabledForWorkspaceWithSomeIdsAndNoMatch() { + final UUID workspaceId = UUID.randomUUID(); + final UUID randomId1 = UUID.randomUUID(); + final UUID randomId2 = UUID.randomUUID(); + when(featureFlags.fieldSelectionWorkspaces()).thenReturn(randomId1.toString() + "," + randomId2.toString()); + + assertFalse(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, workspaceId)); + } + +} diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java index 3aac41442302..6226ced2b346 100644 --- a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java +++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java @@ -10,6 +10,7 @@ import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.SOURCE_DOCKER_IMAGE_KEY; import datadog.trace.api.Trace; +import io.airbyte.commons.features.FeatureFlagHelper; import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.logging.MdcScope; @@ -45,11 +46,8 @@ import io.airbyte.workers.sync.ReplicationLauncherWorker; import java.lang.invoke.MethodHandles; import java.nio.file.Path; -import java.util.HashSet; import java.util.Map; import java.util.Optional; -import java.util.Set; -import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -151,7 +149,7 @@ public Optional runJob() throws Exception { new VersionedAirbyteMessageBufferedWriterFactory(serDeProvider, migratorFactory, destinationLauncherConfig.getProtocolVersion())), new AirbyteMessageTracker(), new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)), - metricReporter, enableFieldSelection(featureFlags, syncInput.getWorkspaceId())); + metricReporter, FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, syncInput.getWorkspaceId())); log.info("Running replication worker..."); final var jobRoot = TemporalUtils.getJobRoot(configs.getWorkspaceRoot(), @@ -168,20 +166,4 @@ private AirbyteStreamFactory getStreamFactory(final Version protocolVersion, fin : new DefaultAirbyteStreamFactory(mdcScope); } - private boolean enableFieldSelection(final FeatureFlags featureFlags, final UUID workspaceId) { - final String workspaceIdsString = featureFlags.fieldSelectionWorkspaces(); - final Set workspaceIds = new HashSet<>(); - for (final String id : workspaceIdsString.split(",")) { - workspaceIds.add(UUID.fromString(id)); - } - for (final UUID workspace : workspaceIds) { - log.info("field selection workspace: {}", workspace); - } - if (workspaceId != null && workspaceIds.contains(workspaceId)) { - return true; - } - - return featureFlags.applyFieldSelection(); - } - } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java index 238fd7da2bc5..9800dc24d513 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java @@ -18,6 +18,7 @@ import io.airbyte.api.client.AirbyteApiClient; import io.airbyte.api.client.invoker.generated.ApiException; import io.airbyte.api.client.model.generated.JobIdRequestBody; +import io.airbyte.commons.features.FeatureFlagHelper; import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.functional.CheckedSupplier; import io.airbyte.commons.json.Jsons; @@ -72,10 +73,8 @@ import jakarta.inject.Singleton; import java.nio.file.Path; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.UUID; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -306,7 +305,7 @@ private CheckedSupplier, Exception> new VersionedAirbyteMessageBufferedWriterFactory(serDeProvider, migratorFactory, destinationLauncherConfig.getProtocolVersion())), new AirbyteMessageTracker(), new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)), - metricReporter, enableFieldSelection(featureFlags, syncInput.getWorkspaceId())); + metricReporter, FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, syncInput.getWorkspaceId())); }; } @@ -344,24 +343,4 @@ private boolean isResetJob(final String dockerImage) { return WorkerConstants.RESET_JOB_SOURCE_DOCKER_IMAGE_STUB.equalsIgnoreCase(dockerImage); } - private boolean enableFieldSelection(final FeatureFlags featureFlags, final UUID workspaceId) { - final String workspaceIdsString = featureFlags.fieldSelectionWorkspaces(); - final Set workspaceIds = new HashSet<>(); - LOGGER.debug("Field selection enabled for {}", workspaceIdsString); - if (!workspaceIdsString.isEmpty()) { - for (final String id : workspaceIdsString.split(",")) { - try { - workspaceIds.add(UUID.fromString(id)); - } catch (IllegalArgumentException e) { - LOGGER.warn("Malformed workspace id for field selection: {}", id); - } - } - } - if (workspaceId != null && workspaceIds.contains(workspaceId)) { - return true; - } - - return featureFlags.applyFieldSelection(); - } - }