diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java index e35e933acb91..dbc95e69c1b3 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java @@ -73,8 +73,6 @@ public class OrchestratorConstants { EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, - EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES, - EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_TAG, FEATURE_FLAG_CLIENT, FEATURE_FLAG_PATH, EnvConfigs.LAUNCHDARKLY_KEY, diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java index d51747587fbc..c95bad235b5e 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java @@ -223,8 +223,6 @@ public Process write(final Path jobRoot, private Map getWorkerMetadata() { final Configs configs = new EnvConfigs(); - // We've managed to exceed the maximum number of parameters for Map.of(), so use a builder + convert - // back to hashmap return Maps.newHashMap( ImmutableMap.builder() .put(WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, imageName) @@ -234,8 +232,6 @@ private Map getWorkerMetadata() { .put(EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(featureFlags.autoDetectSchema())) .put(EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(featureFlags.applyFieldSelection())) .put(EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, featureFlags.fieldSelectionWorkspaces()) - .put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES, featureFlags.strictComparisonNormalizationWorkspaces()) - .put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_TAG, featureFlags.strictComparisonNormalizationTag()) .put(EnvConfigs.SOCAT_KUBE_CPU_LIMIT, configs.getSocatSidecarKubeCpuLimit()) .put(EnvConfigs.SOCAT_KUBE_CPU_REQUEST, configs.getSocatSidecarKubeCpuRequest()) .put(EnvConfigs.LAUNCHDARKLY_KEY, configs.getLaunchDarklyKey()) diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java index da5040edf412..42a0e76086d9 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java @@ -66,8 +66,6 @@ class AirbyteIntegrationLauncherTest { .put(EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(FEATURE_FLAGS.autoDetectSchema())) .put(EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(FEATURE_FLAGS.applyFieldSelection())) .put(EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, FEATURE_FLAGS.fieldSelectionWorkspaces()) - .put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES, FEATURE_FLAGS.strictComparisonNormalizationWorkspaces()) - .put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_TAG, FEATURE_FLAGS.strictComparisonNormalizationTag()) .put(EnvConfigs.SOCAT_KUBE_CPU_LIMIT, CONFIGS.getSocatSidecarKubeCpuLimit()) .put(EnvConfigs.SOCAT_KUBE_CPU_REQUEST, CONFIGS.getSocatSidecarKubeCpuRequest()) .put(EnvConfigs.LAUNCHDARKLY_KEY, CONFIGS.getLaunchDarklyKey()) diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java index 62af19bf77cf..49e29b561d7b 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java @@ -22,9 +22,6 @@ public class EnvVariableFeatureFlags implements FeatureFlags { public static final String FIELD_SELECTION_WORKSPACES = "FIELD_SELECTION_WORKSPACES"; - public static final String STRICT_COMPARISON_NORMALIZATION_WORKSPACES = "STRICT_COMPARISON_NORMALIZATION_WORKSPACES"; - public static final String STRICT_COMPARISON_NORMALIZATION_TAG = "STRICT_COMPARISON_NORMALIZATION_TAG"; - @Override public boolean autoDisablesFailingConnections() { log.info("Auto Disable Failing Connections: " + Boolean.parseBoolean(System.getenv("AUTO_DISABLE_FAILING_CONNECTIONS"))); @@ -67,16 +64,6 @@ public String fieldSelectionWorkspaces() { return getEnvOrDefault(FIELD_SELECTION_WORKSPACES, "", (arg) -> arg); } - @Override - public String strictComparisonNormalizationWorkspaces() { - return getEnvOrDefault(STRICT_COMPARISON_NORMALIZATION_WORKSPACES, "", (arg) -> arg); - } - - @Override - public String strictComparisonNormalizationTag() { - return getEnvOrDefault(STRICT_COMPARISON_NORMALIZATION_TAG, "strict_comparison", (arg) -> arg); - } - // TODO: refactor in order to use the same method than the ones in EnvConfigs.java public T getEnvOrDefault(final String key, final T defaultValue, final Function parser) { final String value = System.getenv(key); diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlagHelper.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlagHelper.java index debff56d38d3..efaf98fb0602 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlagHelper.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlagHelper.java @@ -4,43 +4,31 @@ package io.airbyte.commons.features; -import com.google.common.annotations.VisibleForTesting; import java.util.HashSet; import java.util.Set; import java.util.UUID; -import java.util.function.Function; import lombok.extern.slf4j.Slf4j; @Slf4j public class FeatureFlagHelper { public static boolean isFieldSelectionEnabledForWorkspace(final FeatureFlags featureFlags, final UUID workspaceId) { - return isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, workspaceId, "field selection") - || featureFlags.applyFieldSelection(); - } - - public static boolean isStrictComparisonNormalizationEnabledForWorkspace(final FeatureFlags featureFlags, final UUID workspaceId) { - return isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::strictComparisonNormalizationWorkspaces, workspaceId, - "strict comparison in normalization"); - } - - @VisibleForTesting - static boolean isWorkspaceIncludedInFlag(final FeatureFlags featureFlags, - final Function flagRetriever, - final UUID workspaceId, - final String context) { - final String workspaceIdsString = flagRetriever.apply(featureFlags); + final String workspaceIdsString = featureFlags.fieldSelectionWorkspaces(); final Set workspaceIds = new HashSet<>(); if (workspaceIdsString != null && !workspaceIdsString.isEmpty()) { for (final String id : workspaceIdsString.split(",")) { try { workspaceIds.add(UUID.fromString(id)); } catch (final IllegalArgumentException e) { - log.warn("Malformed workspace id for {}: {}", context, id); + log.warn("Malformed workspace id for field selection: {}", id); } } } - return workspaceId != null && workspaceIds.contains(workspaceId); + if (workspaceId != null && workspaceIds.contains(workspaceId)) { + return true; + } + + return featureFlags.applyFieldSelection(); } } diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java index d43006d6ae2b..aa2055047422 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java @@ -37,18 +37,4 @@ public interface FeatureFlags { */ String fieldSelectionWorkspaces(); - /** - * Get the workspaces allow-listed for strict incremental comparison in normalization. This takes - * precedence over the normalization version in destination_definitions.yaml. - * - * @return a comma-separated list of workspace ids where strict incremental comparison should be - * enabled in normalization. - */ - String strictComparisonNormalizationWorkspaces(); - - /** - * @return The Docker image tag representing the normalization version with strict-comparison - */ - String strictComparisonNormalizationTag(); - } diff --git a/airbyte-commons/src/test/java/io/airbyte/commons/features/FeatureFlagHelperTest.java b/airbyte-commons/src/test/java/io/airbyte/commons/features/FeatureFlagHelperTest.java index 6d000e86f8f1..6255a708d83b 100644 --- a/airbyte-commons/src/test/java/io/airbyte/commons/features/FeatureFlagHelperTest.java +++ b/airbyte-commons/src/test/java/io/airbyte/commons/features/FeatureFlagHelperTest.java @@ -26,37 +26,30 @@ void beforeEach() { void isFieldSelectionEnabledForWorkspaceWithEmptyString() { when(featureFlags.fieldSelectionWorkspaces()).thenReturn(""); - assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, UUID.randomUUID(), null)); - } - - @Test - void isFieldSelectionEnabledForNullWorkspaceWithEmptyString() { - when(featureFlags.fieldSelectionWorkspaces()).thenReturn(""); - - assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, null, null)); + assertFalse(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, UUID.randomUUID())); } @Test void isFieldSelectionEnabledForWorkspaceWithSpaceString() { when(featureFlags.fieldSelectionWorkspaces()).thenReturn(" "); - assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, UUID.randomUUID(), null)); + assertFalse(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, UUID.randomUUID())); } @Test void isFieldSelectionEnabledForWorkspaceWithNullString() { when(featureFlags.fieldSelectionWorkspaces()).thenReturn(null); - assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, UUID.randomUUID(), null)); + assertFalse(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, UUID.randomUUID())); } @Test void isFieldSelectionEnabledForWorkspaceWithSomeIdsAndAMatch() { final UUID workspaceId = UUID.randomUUID(); final UUID randomId = UUID.randomUUID(); - when(featureFlags.fieldSelectionWorkspaces()).thenReturn(randomId + "," + workspaceId); + when(featureFlags.fieldSelectionWorkspaces()).thenReturn(randomId.toString() + "," + workspaceId.toString()); - assertTrue(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, workspaceId, null)); + assertTrue(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, workspaceId)); } @Test @@ -64,9 +57,9 @@ void isFieldSelectionEnabledForWorkspaceWithSomeIdsAndNoMatch() { final UUID workspaceId = UUID.randomUUID(); final UUID randomId1 = UUID.randomUUID(); final UUID randomId2 = UUID.randomUUID(); - when(featureFlags.fieldSelectionWorkspaces()).thenReturn(randomId1 + "," + randomId2); + when(featureFlags.fieldSelectionWorkspaces()).thenReturn(randomId1.toString() + "," + randomId2.toString()); - assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, workspaceId, null)); + assertFalse(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, workspaceId)); } } diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java index 2dac35e85694..d48d5603e847 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java @@ -758,10 +758,6 @@ public interface Configs { String getFieldSelectionWorkspaces(); - String getStrictComparisonNormalizationWorkspaces(); - - String getStrictComparisonNormalizationTag(); - enum TrackingStrategy { SEGMENT, LOGGING diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java index d28a25e0357b..d53a51ac009c 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -224,9 +224,6 @@ public class EnvConfigs implements Configs { private static final String APPLY_FIELD_SELECTION = "APPLY_FIELD_SELECTION"; private static final String FIELD_SELECTION_WORKSPACES = "FIELD_SELECTION_WORKSPACES"; - private static final String STRICT_COMPARISON_NORMALIZATION_WORKSPACES = "STRICT_COMPARISON_NORMALIZATION_WORKSPACES"; - private static final String STRICT_COMPARISON_NORMALIZATION_TAG = "STRICT_COMPARISON_NORMALIZATION_TAG"; - public static final Map> JOB_SHARED_ENVS = Map.of( AIRBYTE_VERSION, (instance) -> instance.getAirbyteVersion().serialize(), AIRBYTE_ROLE, EnvConfigs::getAirbyteRole, @@ -1155,16 +1152,6 @@ public String getFieldSelectionWorkspaces() { return getEnvOrDefault(FIELD_SELECTION_WORKSPACES, ""); } - @Override - public String getStrictComparisonNormalizationWorkspaces() { - return getEnvOrDefault(STRICT_COMPARISON_NORMALIZATION_WORKSPACES, ""); - } - - @Override - public String getStrictComparisonNormalizationTag() { - return getEnvOrDefault(STRICT_COMPARISON_NORMALIZATION_TAG, "strict_comparison"); - } - @Override public int getActivityNumberOfAttempt() { return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_ATTEMPT, "5")); diff --git a/airbyte-config/config-models/src/main/resources/types/NormalizationInput.yaml b/airbyte-config/config-models/src/main/resources/types/NormalizationInput.yaml index 72fd7ffbcdb8..584141e6e14f 100644 --- a/airbyte-config/config-models/src/main/resources/types/NormalizationInput.yaml +++ b/airbyte-config/config-models/src/main/resources/types/NormalizationInput.yaml @@ -21,7 +21,3 @@ properties: type: object description: optional resource requirements to run sync workers existingJavaType: io.airbyte.config.ResourceRequirements - workspaceId: - description: The id of the workspace associated with this sync - type: string - format: uuid diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java index 42caf494f70d..7917e6c449c3 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java @@ -96,9 +96,6 @@ public ContainerOrchestratorConfig kubernetesContainerOrchestratorConfig( environmentVariables.put(EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, Boolean.toString(featureFlags.autoDetectSchema())); environmentVariables.put(EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, Boolean.toString(featureFlags.applyFieldSelection())); environmentVariables.put(EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, featureFlags.fieldSelectionWorkspaces()); - environmentVariables.put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES, - featureFlags.strictComparisonNormalizationWorkspaces()); - environmentVariables.put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_TAG, featureFlags.strictComparisonNormalizationTag()); environmentVariables.put(JAVA_OPTS_ENV_VAR, containerOrchestratorJavaOpts); environmentVariables.put(CONTROL_PLANE_AUTH_ENDPOINT_ENV_VAR, controlPlaneAuthEndpoint); environmentVariables.put(DATA_PLANE_SERVICE_ACCOUNT_CREDENTIALS_PATH_ENV_VAR, dataPlaneServiceAccountCredentialsPath); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java index 8b7ef2f885d0..db664fbc29df 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java @@ -13,8 +13,6 @@ import datadog.trace.api.Trace; import io.airbyte.api.client.AirbyteApiClient; import io.airbyte.api.client.model.generated.JobIdRequestBody; -import io.airbyte.commons.features.FeatureFlagHelper; -import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.functional.CheckedSupplier; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.protocol.migrations.v1.CatalogMigrationV1Helper; @@ -52,10 +50,8 @@ import java.util.Optional; import java.util.UUID; import java.util.function.Supplier; -import lombok.extern.slf4j.Slf4j; @Singleton -@Slf4j public class NormalizationActivityImpl implements NormalizationActivity { private final Optional containerOrchestratorConfig; @@ -66,17 +62,13 @@ public class NormalizationActivityImpl implements NormalizationActivity { private final WorkerEnvironment workerEnvironment; private final LogConfigs logConfigs; private final String airbyteVersion; - private final FeatureFlags featureFlags; private final Integer serverPort; private final AirbyteConfigValidator airbyteConfigValidator; private final TemporalUtils temporalUtils; private final ResourceRequirements normalizationResourceRequirements; private final AirbyteApiClient airbyteApiClient; - // This constant is not currently in use. We'll need to bump it when we try releasing v1 again. - private static final Version MINIMAL_VERSION_FOR_DATATYPES_V1 = new Version("0.3.0"); - private static final String V1_NORMALIZATION_MINOR_VERSION = "3"; - private static final String NON_STRICT_COMPARISON_IMAGE_TAG = "0.2.25"; + private final static Version MINIMAL_VERSION_FOR_DATATYPES_V1 = new Version("0.3.0"); public NormalizationActivityImpl(@Named("containerOrchestratorConfig") final Optional containerOrchestratorConfig, @Named("defaultWorkerConfigs") final WorkerConfigs workerConfigs, @@ -86,7 +78,6 @@ public NormalizationActivityImpl(@Named("containerOrchestratorConfig") final Opt final WorkerEnvironment workerEnvironment, final LogConfigs logConfigs, @Value("${airbyte.version}") final String airbyteVersion, - final FeatureFlags featureFlags, @Value("${micronaut.server.port}") final Integer serverPort, final AirbyteConfigValidator airbyteConfigValidator, final TemporalUtils temporalUtils, @@ -100,7 +91,6 @@ public NormalizationActivityImpl(@Named("containerOrchestratorConfig") final Opt this.workerEnvironment = workerEnvironment; this.logConfigs = logConfigs; this.airbyteVersion = airbyteVersion; - this.featureFlags = featureFlags; this.serverPort = serverPort; this.airbyteConfigValidator = airbyteConfigValidator; this.temporalUtils = temporalUtils; @@ -121,17 +111,11 @@ public NormalizationSummary normalize(final JobRunConfig jobRunConfig, final var fullDestinationConfig = secretsHydrator.hydrate(input.getDestinationConfiguration()); final var fullInput = Jsons.clone(input).withDestinationConfiguration(fullDestinationConfig); - if (FeatureFlagHelper.isStrictComparisonNormalizationEnabledForWorkspace(featureFlags, input.getWorkspaceId())) { - log.info("Using strict comparison normalization"); - replaceNormalizationImageTag(destinationLauncherConfig, featureFlags.strictComparisonNormalizationTag()); - } - // Check the version of normalization // We require at least version 0.3.0 to support data types v1. Using an older version would lead to // all columns being typed as JSONB. If normalization is using an older version, fallback to using // v0 data types. if (!normalizationSupportsV1DataTypes(destinationLauncherConfig)) { - log.info("Using protocol v0"); CatalogMigrationV1Helper.downgradeSchemaIfNeeded(fullInput.getCatalog()); } else { @@ -140,7 +124,6 @@ public NormalizationSummary normalize(final JobRunConfig jobRunConfig, // phase v0 out. // Performance impact should be low considering the nature of the check compared to the time to run // normalization. - log.info("Using protocol v1"); CatalogMigrationV1Helper.upgradeSchemaIfNeeded(fullInput.getCatalog()); } @@ -151,7 +134,6 @@ public NormalizationSummary normalize(final JobRunConfig jobRunConfig, final CheckedSupplier, Exception> workerFactory; - log.info("Using normalization: " + destinationLauncherConfig.getNormalizationDockerImage()); if (containerOrchestratorConfig.isPresent()) { workerFactory = getContainerLauncherWorkerFactory(workerConfigs, destinationLauncherConfig, jobRunConfig, () -> context); @@ -180,15 +162,15 @@ public NormalizationInput generateNormalizationInput(final StandardSyncInput syn return new NormalizationInput() .withDestinationConfiguration(syncInput.getDestinationConfiguration()) .withCatalog(syncOutput.getOutputCatalog()) - .withResourceRequirements(normalizationResourceRequirements) - .withWorkspaceId(syncInput.getWorkspaceId()); + .withResourceRequirements(normalizationResourceRequirements); } @VisibleForTesting static boolean normalizationSupportsV1DataTypes(final IntegrationLauncherConfig destinationLauncherConfig) { try { - final Version normalizationVersion = new Version(getNormalizationImageTag(destinationLauncherConfig)); - return V1_NORMALIZATION_MINOR_VERSION.equals(normalizationVersion.getMinorVersion()); + final String[] normalizationImage = destinationLauncherConfig.getNormalizationDockerImage().split(":", 2); + final Version normalizationVersion = new Version(normalizationImage[1]); + return normalizationVersion.greaterThanOrEqualTo(MINIMAL_VERSION_FOR_DATATYPES_V1); } catch (final IllegalArgumentException e) { // IllegalArgument here means that the version isn't in a semver format. // The current behavior is to assume it supports v0 data types for dev purposes. @@ -196,17 +178,6 @@ static boolean normalizationSupportsV1DataTypes(final IntegrationLauncherConfig } } - private static String getNormalizationImageTag(final IntegrationLauncherConfig destinationLauncherConfig) { - return destinationLauncherConfig.getNormalizationDockerImage().split(":", 2)[1]; - } - - @VisibleForTesting - static void replaceNormalizationImageTag(final IntegrationLauncherConfig destinationLauncherConfig, final String newTag) { - final String[] imageComponents = destinationLauncherConfig.getNormalizationDockerImage().split(":", 2); - imageComponents[1] = newTag; - destinationLauncherConfig.setNormalizationDockerImage(String.join(":", imageComponents)); - } - private CheckedSupplier, Exception> getLegacyWorkerFactory( final IntegrationLauncherConfig destinationLauncherConfig, final JobRunConfig jobRunConfig) { diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/NormalizationActivityImplTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/NormalizationActivityImplTest.java index 0c75c219b0c7..9164f5c2834b 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/NormalizationActivityImplTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/NormalizationActivityImplTest.java @@ -4,11 +4,6 @@ package io.airbyte.workers.temporal.sync; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import io.airbyte.commons.features.FeatureFlags; import io.airbyte.persistence.job.models.IntegrationLauncherConfig; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -20,20 +15,9 @@ void checkNormalizationDataTypesSupportFromVersionString() { Assertions.assertFalse(NormalizationActivityImpl.normalizationSupportsV1DataTypes(withNormalizationVersion("0.2.5"))); Assertions.assertFalse(NormalizationActivityImpl.normalizationSupportsV1DataTypes(withNormalizationVersion("0.1.1"))); Assertions.assertTrue(NormalizationActivityImpl.normalizationSupportsV1DataTypes(withNormalizationVersion("0.3.0"))); - Assertions.assertFalse(NormalizationActivityImpl.normalizationSupportsV1DataTypes(withNormalizationVersion("0.4.1"))); - Assertions.assertFalse(NormalizationActivityImpl.normalizationSupportsV1DataTypes(withNormalizationVersion("dev"))); + Assertions.assertTrue(NormalizationActivityImpl.normalizationSupportsV1DataTypes(withNormalizationVersion("0.4.1"))); + Assertions.assertTrue(NormalizationActivityImpl.normalizationSupportsV1DataTypes(withNormalizationVersion("dev"))); Assertions.assertFalse(NormalizationActivityImpl.normalizationSupportsV1DataTypes(withNormalizationVersion("protocolv1"))); - Assertions.assertFalse(NormalizationActivityImpl.normalizationSupportsV1DataTypes(withNormalizationVersion("strict_comparison"))); - } - - @Test - void checkNormalizationTagReplacement() { - final FeatureFlags featureFlags = mock(FeatureFlags.class); - when(featureFlags.strictComparisonNormalizationTag()).thenReturn("strict_comparison"); - - final IntegrationLauncherConfig config = withNormalizationVersion("0.2.25"); - NormalizationActivityImpl.replaceNormalizationImageTag(config, "strict_comparison"); - assertEquals("normalization:strict_comparison", config.getNormalizationDockerImage()); } private IntegrationLauncherConfig withNormalizationVersion(final String version) { diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java index 73e04f5b9fa8..bd605a6f6aaf 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java @@ -455,7 +455,7 @@ private static void verifyPersistState(final PersistStateActivity persistStateAc configuredCatalog); } - private void verifyNormalize(final NormalizationActivity normalizationActivity, final NormalizationInput normalizationInput) { + private static void verifyNormalize(final NormalizationActivity normalizationActivity, final NormalizationInput normalizationInput) { verify(normalizationActivity).normalize( JOB_RUN_CONFIG, DESTINATION_LAUNCHER_CONFIG, diff --git a/charts/airbyte-worker/templates/deployment.yaml b/charts/airbyte-worker/templates/deployment.yaml index e37bf9af02e3..af9517b02f69 100644 --- a/charts/airbyte-worker/templates/deployment.yaml +++ b/charts/airbyte-worker/templates/deployment.yaml @@ -310,16 +310,6 @@ spec: configMapKeyRef: name: {{ .Release.Name }}-airbyte-env key: USE_STREAM_CAPABLE_STATE - - name: STRICT_COMPARISON_NORMALIZATION_TAG - valueFrom: - configMapKeyRef: - name: {{ .Release.Name }}-airbyte-env - key: STRICT_COMPARISON_NORMALIZATION_TAG - - name: STRICT_COMPARISON_NORMALIZATION_WORKSPACES - valueFrom: - configMapKeyRef: - name: {{ .Release.Name }}-airbyte-env - key: STRICT_COMPARISON_NORMALIZATION_WORKSPACES - name: SHOULD_RUN_NOTIFY_WORKFLOWS valueFrom: configMapKeyRef: diff --git a/docker-compose.yaml b/docker-compose.yaml index a8baa3c3ecbf..6392166bcd70 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -109,8 +109,6 @@ services: - MICRONAUT_ENVIRONMENTS=${WORKERS_MICRONAUT_ENVIRONMENTS} - APPLY_FIELD_SELECTION=${APPLY_FIELD_SELECTION} - FIELD_SELECTION_WORKSPACES=${FIELD_SELECTION_WORKSPACES} - - STRICT_COMPARISON_NORMALIZATION_WORKSPACES=${STRICT_COMPARISON_NORMALIZATION_WORKSPACES} - - STRICT_COMPARISON_NORMALIZATION_TAG=${STRICT_COMPARISON_NORMALIZATION_TAG} configs: - flags volumes: diff --git a/kube/resources/worker.yaml b/kube/resources/worker.yaml index 4fb2605845f6..3c426ded1708 100644 --- a/kube/resources/worker.yaml +++ b/kube/resources/worker.yaml @@ -300,16 +300,6 @@ spec: configMapKeyRef: name: airbyte-env key: USE_STREAM_CAPABLE_STATE - - name: STRICT_COMPARISON_NORMALIZATION_TAG - valueFrom: - configMapKeyRef: - name: airbyte-env - key: STRICT_COMPARISON_NORMALIZATION_TAG - - name: STRICT_COMPARISON_NORMALIZATION_WORKSPACES - valueFrom: - configMapKeyRef: - name: airbyte-env - key: STRICT_COMPARISON_NORMALIZATION_WORKSPACES - name: SHOULD_RUN_NOTIFY_WORKFLOWS valueFrom: configMapKeyRef: