From 14baa3bdba3510284b59da0b17674230d26c6aaf Mon Sep 17 00:00:00 2001 From: Jimmy Ma Date: Tue, 7 Feb 2023 17:07:28 -0800 Subject: [PATCH] Add normalization strict incremental feature flag (#22514) * maybe add feature flag? * pattern matching * also add version check * formatting * refactor test also * extract test + fix method call * minor tweaks * add context to log message * put workspace id in normalization input * use non-semver tag * add flag for version of normalization * also flag old version * add test * missed part of the commit * format * add test for null workspace ID * Revert "also flag old version" This reverts commit 3be601d16cbc929678732462ae7f5c7ddfe5ef37. * Revert "missed part of the commit" This reverts commit 47a67b4631cb359e3b4caa0bd75e7848fae9c665. * always apply flag, even if we're behind a version * derp * Add more logging to the normalization activity * Update charts and kustomize for the feature flag * Format --------- Co-authored-by: Edward Gao --- .../temporal/sync/OrchestratorConstants.java | 2 + .../process/AirbyteIntegrationLauncher.java | 4 ++ .../AirbyteIntegrationLauncherTest.java | 2 + .../features/EnvVariableFeatureFlags.java | 13 +++++++ .../commons/features/FeatureFlagHelper.java | 26 +++++++++---- .../commons/features/FeatureFlags.java | 14 +++++++ .../features/FeatureFlagHelperTest.java | 21 ++++++---- .../main/java/io/airbyte/config/Configs.java | 4 ++ .../java/io/airbyte/config/EnvConfigs.java | 13 +++++++ .../resources/types/NormalizationInput.yaml | 4 ++ ...ontainerOrchestratorConfigBeanFactory.java | 3 ++ .../sync/NormalizationActivityImpl.java | 39 ++++++++++++++++--- .../sync/NormalizationActivityImplTest.java | 20 +++++++++- .../temporal/sync/SyncWorkflowTest.java | 2 +- .../airbyte-worker/templates/deployment.yaml | 10 +++++ docker-compose.yaml | 2 + kube/resources/worker.yaml | 10 +++++ 17 files changed, 167 insertions(+), 22 deletions(-) diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java index cf0a128d3c8a..6741b2b79748 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java @@ -73,6 +73,8 @@ public class OrchestratorConstants { EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, + EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES, + EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_TAG, FEATURE_FLAG_CLIENT, FEATURE_FLAG_PATH, EnvConfigs.LAUNCHDARKLY_KEY, diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java index dd9dcd42499f..853a5e9179ed 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java @@ -223,6 +223,8 @@ public Process write(final Path jobRoot, private Map getWorkerMetadata() { final Configs configs = new EnvConfigs(); + // We've managed to exceed the maximum number of parameters for Map.of(), so use a builder + convert + // back to hashmap return Maps.newHashMap( ImmutableMap.builder() .put(WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, imageName) @@ -232,6 +234,8 @@ private Map getWorkerMetadata() { .put(EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(featureFlags.autoDetectSchema())) .put(EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(featureFlags.applyFieldSelection())) .put(EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, featureFlags.fieldSelectionWorkspaces()) + .put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES, featureFlags.strictComparisonNormalizationWorkspaces()) + .put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_TAG, featureFlags.strictComparisonNormalizationTag()) .put(EnvConfigs.SOCAT_KUBE_CPU_LIMIT, configs.getSocatSidecarKubeCpuLimit()) .put(EnvConfigs.SOCAT_KUBE_CPU_REQUEST, configs.getSocatSidecarKubeCpuRequest()) .put(EnvConfigs.LAUNCHDARKLY_KEY, configs.getLaunchDarklyKey()) diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java index 45f7c5da2bcb..2820bd350622 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java @@ -66,6 +66,8 @@ class AirbyteIntegrationLauncherTest { .put(EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(FEATURE_FLAGS.autoDetectSchema())) .put(EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(FEATURE_FLAGS.applyFieldSelection())) .put(EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, FEATURE_FLAGS.fieldSelectionWorkspaces()) + .put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES, FEATURE_FLAGS.strictComparisonNormalizationWorkspaces()) + .put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_TAG, FEATURE_FLAGS.strictComparisonNormalizationTag()) .put(EnvConfigs.SOCAT_KUBE_CPU_LIMIT, CONFIGS.getSocatSidecarKubeCpuLimit()) .put(EnvConfigs.SOCAT_KUBE_CPU_REQUEST, CONFIGS.getSocatSidecarKubeCpuRequest()) .put(EnvConfigs.LAUNCHDARKLY_KEY, CONFIGS.getLaunchDarklyKey()) diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java index b574d0a40cee..30c7bdcbc24b 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java @@ -22,6 +22,9 @@ public class EnvVariableFeatureFlags implements FeatureFlags { public static final String FIELD_SELECTION_WORKSPACES = "FIELD_SELECTION_WORKSPACES"; + public static final String STRICT_COMPARISON_NORMALIZATION_WORKSPACES = "STRICT_COMPARISON_NORMALIZATION_WORKSPACES"; + public static final String STRICT_COMPARISON_NORMALIZATION_TAG = "STRICT_COMPARISON_NORMALIZATION_TAG"; + @Override public boolean autoDisablesFailingConnections() { log.info("Auto Disable Failing Connections: " + Boolean.parseBoolean(System.getenv("AUTO_DISABLE_FAILING_CONNECTIONS"))); @@ -64,6 +67,16 @@ public String fieldSelectionWorkspaces() { return getEnvOrDefault(FIELD_SELECTION_WORKSPACES, "", (arg) -> arg); } + @Override + public String strictComparisonNormalizationWorkspaces() { + return getEnvOrDefault(STRICT_COMPARISON_NORMALIZATION_WORKSPACES, "", (arg) -> arg); + } + + @Override + public String strictComparisonNormalizationTag() { + return getEnvOrDefault(STRICT_COMPARISON_NORMALIZATION_TAG, "strict_comparison", (arg) -> arg); + } + // TODO: refactor in order to use the same method than the ones in EnvConfigs.java public T getEnvOrDefault(final String key, final T defaultValue, final Function parser) { final String value = System.getenv(key); diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlagHelper.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlagHelper.java index 1eeefa4828f9..a4ff90ddae24 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlagHelper.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlagHelper.java @@ -4,31 +4,43 @@ package io.airbyte.commons.features; +import com.google.common.annotations.VisibleForTesting; import java.util.HashSet; import java.util.Set; import java.util.UUID; +import java.util.function.Function; import lombok.extern.slf4j.Slf4j; @Slf4j public class FeatureFlagHelper { public static boolean isFieldSelectionEnabledForWorkspace(final FeatureFlags featureFlags, final UUID workspaceId) { - final String workspaceIdsString = featureFlags.fieldSelectionWorkspaces(); + return isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, workspaceId, "field selection") + || featureFlags.applyFieldSelection(); + } + + public static boolean isStrictComparisonNormalizationEnabledForWorkspace(final FeatureFlags featureFlags, final UUID workspaceId) { + return isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::strictComparisonNormalizationWorkspaces, workspaceId, + "strict comparison in normalization"); + } + + @VisibleForTesting + static boolean isWorkspaceIncludedInFlag(final FeatureFlags featureFlags, + final Function flagRetriever, + final UUID workspaceId, + final String context) { + final String workspaceIdsString = flagRetriever.apply(featureFlags); final Set workspaceIds = new HashSet<>(); if (workspaceIdsString != null && !workspaceIdsString.isEmpty()) { for (final String id : workspaceIdsString.split(",")) { try { workspaceIds.add(UUID.fromString(id)); } catch (final IllegalArgumentException e) { - log.warn("Malformed workspace id for field selection: {}", id); + log.warn("Malformed workspace id for {}: {}", context, id); } } } - if (workspaceId != null && workspaceIds.contains(workspaceId)) { - return true; - } - - return featureFlags.applyFieldSelection(); + return workspaceId != null && workspaceIds.contains(workspaceId); } } diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java index 60a40c89ded0..eb7ab4cf48cf 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java @@ -37,4 +37,18 @@ public interface FeatureFlags { */ String fieldSelectionWorkspaces(); + /** + * Get the workspaces allow-listed for strict incremental comparison in normalization. This takes + * precedence over the normalization version in destination_definitions.yaml. + * + * @return a comma-separated list of workspace ids where strict incremental comparison should be + * enabled in normalization. + */ + String strictComparisonNormalizationWorkspaces(); + + /** + * @return The Docker image tag representing the normalization version with strict-comparison + */ + String strictComparisonNormalizationTag(); + } diff --git a/airbyte-commons/src/test/java/io/airbyte/commons/features/FeatureFlagHelperTest.java b/airbyte-commons/src/test/java/io/airbyte/commons/features/FeatureFlagHelperTest.java index 2b82701f8e0e..c42bfafc3b0b 100644 --- a/airbyte-commons/src/test/java/io/airbyte/commons/features/FeatureFlagHelperTest.java +++ b/airbyte-commons/src/test/java/io/airbyte/commons/features/FeatureFlagHelperTest.java @@ -26,30 +26,37 @@ void beforeEach() { void isFieldSelectionEnabledForWorkspaceWithEmptyString() { when(featureFlags.fieldSelectionWorkspaces()).thenReturn(""); - assertFalse(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, UUID.randomUUID())); + assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, UUID.randomUUID(), null)); + } + + @Test + void isFieldSelectionEnabledForNullWorkspaceWithEmptyString() { + when(featureFlags.fieldSelectionWorkspaces()).thenReturn(""); + + assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, null, null)); } @Test void isFieldSelectionEnabledForWorkspaceWithSpaceString() { when(featureFlags.fieldSelectionWorkspaces()).thenReturn(" "); - assertFalse(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, UUID.randomUUID())); + assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, UUID.randomUUID(), null)); } @Test void isFieldSelectionEnabledForWorkspaceWithNullString() { when(featureFlags.fieldSelectionWorkspaces()).thenReturn(null); - assertFalse(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, UUID.randomUUID())); + assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, UUID.randomUUID(), null)); } @Test void isFieldSelectionEnabledForWorkspaceWithSomeIdsAndAMatch() { final UUID workspaceId = UUID.randomUUID(); final UUID randomId = UUID.randomUUID(); - when(featureFlags.fieldSelectionWorkspaces()).thenReturn(randomId.toString() + "," + workspaceId.toString()); + when(featureFlags.fieldSelectionWorkspaces()).thenReturn(randomId + "," + workspaceId); - assertTrue(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, workspaceId)); + assertTrue(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, workspaceId, null)); } @Test @@ -57,9 +64,9 @@ void isFieldSelectionEnabledForWorkspaceWithSomeIdsAndNoMatch() { final UUID workspaceId = UUID.randomUUID(); final UUID randomId1 = UUID.randomUUID(); final UUID randomId2 = UUID.randomUUID(); - when(featureFlags.fieldSelectionWorkspaces()).thenReturn(randomId1.toString() + "," + randomId2.toString()); + when(featureFlags.fieldSelectionWorkspaces()).thenReturn(randomId1 + "," + randomId2); - assertFalse(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, workspaceId)); + assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, workspaceId, null)); } } diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java index f609ccba3ec7..aa43779da4cc 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java @@ -758,6 +758,10 @@ public interface Configs { String getFieldSelectionWorkspaces(); + String getStrictComparisonNormalizationWorkspaces(); + + String getStrictComparisonNormalizationTag(); + enum TrackingStrategy { SEGMENT, LOGGING diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java index ca2fdc3488d0..c9a4296cfda3 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -224,6 +224,9 @@ public class EnvConfigs implements Configs { private static final String APPLY_FIELD_SELECTION = "APPLY_FIELD_SELECTION"; private static final String FIELD_SELECTION_WORKSPACES = "FIELD_SELECTION_WORKSPACES"; + private static final String STRICT_COMPARISON_NORMALIZATION_WORKSPACES = "STRICT_COMPARISON_NORMALIZATION_WORKSPACES"; + private static final String STRICT_COMPARISON_NORMALIZATION_TAG = "STRICT_COMPARISON_NORMALIZATION_TAG"; + public static final Map> JOB_SHARED_ENVS = Map.of( AIRBYTE_VERSION, (instance) -> instance.getAirbyteVersion().serialize(), AIRBYTE_ROLE, EnvConfigs::getAirbyteRole, @@ -1152,6 +1155,16 @@ public String getFieldSelectionWorkspaces() { return getEnvOrDefault(FIELD_SELECTION_WORKSPACES, ""); } + @Override + public String getStrictComparisonNormalizationWorkspaces() { + return getEnvOrDefault(STRICT_COMPARISON_NORMALIZATION_WORKSPACES, ""); + } + + @Override + public String getStrictComparisonNormalizationTag() { + return getEnvOrDefault(STRICT_COMPARISON_NORMALIZATION_TAG, "strict_comparison"); + } + @Override public int getActivityNumberOfAttempt() { return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_ATTEMPT, "5")); diff --git a/airbyte-config/config-models/src/main/resources/types/NormalizationInput.yaml b/airbyte-config/config-models/src/main/resources/types/NormalizationInput.yaml index 584141e6e14f..72fd7ffbcdb8 100644 --- a/airbyte-config/config-models/src/main/resources/types/NormalizationInput.yaml +++ b/airbyte-config/config-models/src/main/resources/types/NormalizationInput.yaml @@ -21,3 +21,7 @@ properties: type: object description: optional resource requirements to run sync workers existingJavaType: io.airbyte.config.ResourceRequirements + workspaceId: + description: The id of the workspace associated with this sync + type: string + format: uuid diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java index d01617daba51..0db20692f318 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java @@ -96,6 +96,9 @@ public ContainerOrchestratorConfig kubernetesContainerOrchestratorConfig( environmentVariables.put(EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, Boolean.toString(featureFlags.autoDetectSchema())); environmentVariables.put(EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, Boolean.toString(featureFlags.applyFieldSelection())); environmentVariables.put(EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, featureFlags.fieldSelectionWorkspaces()); + environmentVariables.put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES, + featureFlags.strictComparisonNormalizationWorkspaces()); + environmentVariables.put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_TAG, featureFlags.strictComparisonNormalizationTag()); environmentVariables.put(JAVA_OPTS_ENV_VAR, containerOrchestratorJavaOpts); environmentVariables.put(CONTROL_PLANE_AUTH_ENDPOINT_ENV_VAR, controlPlaneAuthEndpoint); environmentVariables.put(DATA_PLANE_SERVICE_ACCOUNT_CREDENTIALS_PATH_ENV_VAR, dataPlaneServiceAccountCredentialsPath); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java index ddf1ca1c4ba0..4f96db8b5941 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java @@ -13,6 +13,8 @@ import datadog.trace.api.Trace; import io.airbyte.api.client.AirbyteApiClient; import io.airbyte.api.client.model.generated.JobIdRequestBody; +import io.airbyte.commons.features.FeatureFlagHelper; +import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.functional.CheckedSupplier; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.protocol.migrations.v1.CatalogMigrationV1Helper; @@ -50,8 +52,10 @@ import java.util.Optional; import java.util.UUID; import java.util.function.Supplier; +import lombok.extern.slf4j.Slf4j; @Singleton +@Slf4j public class NormalizationActivityImpl implements NormalizationActivity { private final Optional containerOrchestratorConfig; @@ -62,13 +66,17 @@ public class NormalizationActivityImpl implements NormalizationActivity { private final WorkerEnvironment workerEnvironment; private final LogConfigs logConfigs; private final String airbyteVersion; + private final FeatureFlags featureFlags; private final Integer serverPort; private final AirbyteConfigValidator airbyteConfigValidator; private final TemporalUtils temporalUtils; private final ResourceRequirements normalizationResourceRequirements; private final AirbyteApiClient airbyteApiClient; - private final static Version MINIMAL_VERSION_FOR_DATATYPES_V1 = new Version("0.3.0"); + // This constant is not currently in use. We'll need to bump it when we try releasing v1 again. + private static final Version MINIMAL_VERSION_FOR_DATATYPES_V1 = new Version("0.3.0"); + private static final String V1_NORMALIZATION_MINOR_VERSION = "3"; + private static final String NON_STRICT_COMPARISON_IMAGE_TAG = "0.2.25"; public NormalizationActivityImpl(@Named("containerOrchestratorConfig") final Optional containerOrchestratorConfig, @Named("defaultWorkerConfigs") final WorkerConfigs workerConfigs, @@ -78,6 +86,7 @@ public NormalizationActivityImpl(@Named("containerOrchestratorConfig") final Opt final WorkerEnvironment workerEnvironment, final LogConfigs logConfigs, @Value("${airbyte.version}") final String airbyteVersion, + final FeatureFlags featureFlags, @Value("${micronaut.server.port}") final Integer serverPort, final AirbyteConfigValidator airbyteConfigValidator, final TemporalUtils temporalUtils, @@ -91,6 +100,7 @@ public NormalizationActivityImpl(@Named("containerOrchestratorConfig") final Opt this.workerEnvironment = workerEnvironment; this.logConfigs = logConfigs; this.airbyteVersion = airbyteVersion; + this.featureFlags = featureFlags; this.serverPort = serverPort; this.airbyteConfigValidator = airbyteConfigValidator; this.temporalUtils = temporalUtils; @@ -111,11 +121,17 @@ public NormalizationSummary normalize(final JobRunConfig jobRunConfig, final var fullDestinationConfig = secretsHydrator.hydrate(input.getDestinationConfiguration()); final var fullInput = Jsons.clone(input).withDestinationConfiguration(fullDestinationConfig); + if (FeatureFlagHelper.isStrictComparisonNormalizationEnabledForWorkspace(featureFlags, input.getWorkspaceId())) { + log.info("Using strict comparison normalization"); + replaceNormalizationImageTag(destinationLauncherConfig, featureFlags.strictComparisonNormalizationTag()); + } + // Check the version of normalization // We require at least version 0.3.0 to support data types v1. Using an older version would lead to // all columns being typed as JSONB. If normalization is using an older version, fallback to using // v0 data types. if (!normalizationSupportsV1DataTypes(destinationLauncherConfig)) { + log.info("Using protocol v0"); CatalogMigrationV1Helper.downgradeSchemaIfNeeded(fullInput.getCatalog()); } else { @@ -124,6 +140,7 @@ public NormalizationSummary normalize(final JobRunConfig jobRunConfig, // phase v0 out. // Performance impact should be low considering the nature of the check compared to the time to run // normalization. + log.info("Using protocol v1"); CatalogMigrationV1Helper.upgradeSchemaIfNeeded(fullInput.getCatalog()); } @@ -134,6 +151,7 @@ public NormalizationSummary normalize(final JobRunConfig jobRunConfig, final CheckedSupplier, Exception> workerFactory; + log.info("Using normalization: " + destinationLauncherConfig.getNormalizationDockerImage()); if (containerOrchestratorConfig.isPresent()) { workerFactory = getContainerLauncherWorkerFactory(workerConfigs, destinationLauncherConfig, jobRunConfig, () -> context); @@ -162,15 +180,15 @@ public NormalizationInput generateNormalizationInput(final StandardSyncInput syn return new NormalizationInput() .withDestinationConfiguration(syncInput.getDestinationConfiguration()) .withCatalog(syncOutput.getOutputCatalog()) - .withResourceRequirements(normalizationResourceRequirements); + .withResourceRequirements(normalizationResourceRequirements) + .withWorkspaceId(syncInput.getWorkspaceId()); } @VisibleForTesting static boolean normalizationSupportsV1DataTypes(final IntegrationLauncherConfig destinationLauncherConfig) { try { - final String[] normalizationImage = destinationLauncherConfig.getNormalizationDockerImage().split(":", 2); - final Version normalizationVersion = new Version(normalizationImage[1]); - return normalizationVersion.greaterThanOrEqualTo(MINIMAL_VERSION_FOR_DATATYPES_V1); + final Version normalizationVersion = new Version(getNormalizationImageTag(destinationLauncherConfig)); + return V1_NORMALIZATION_MINOR_VERSION.equals(normalizationVersion.getMinorVersion()); } catch (final IllegalArgumentException e) { // IllegalArgument here means that the version isn't in a semver format. // The current behavior is to assume it supports v0 data types for dev purposes. @@ -178,6 +196,17 @@ static boolean normalizationSupportsV1DataTypes(final IntegrationLauncherConfig } } + private static String getNormalizationImageTag(final IntegrationLauncherConfig destinationLauncherConfig) { + return destinationLauncherConfig.getNormalizationDockerImage().split(":", 2)[1]; + } + + @VisibleForTesting + static void replaceNormalizationImageTag(final IntegrationLauncherConfig destinationLauncherConfig, final String newTag) { + final String[] imageComponents = destinationLauncherConfig.getNormalizationDockerImage().split(":", 2); + imageComponents[1] = newTag; + destinationLauncherConfig.setNormalizationDockerImage(String.join(":", imageComponents)); + } + private CheckedSupplier, Exception> getLegacyWorkerFactory( final IntegrationLauncherConfig destinationLauncherConfig, final JobRunConfig jobRunConfig) { diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/NormalizationActivityImplTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/NormalizationActivityImplTest.java index aff7afb1ab57..cc08695d8b61 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/NormalizationActivityImplTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/NormalizationActivityImplTest.java @@ -4,6 +4,11 @@ package io.airbyte.workers.temporal.sync; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.airbyte.commons.features.FeatureFlags; import io.airbyte.persistence.job.models.IntegrationLauncherConfig; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -15,9 +20,20 @@ void checkNormalizationDataTypesSupportFromVersionString() { Assertions.assertFalse(NormalizationActivityImpl.normalizationSupportsV1DataTypes(withNormalizationVersion("0.2.5"))); Assertions.assertFalse(NormalizationActivityImpl.normalizationSupportsV1DataTypes(withNormalizationVersion("0.1.1"))); Assertions.assertTrue(NormalizationActivityImpl.normalizationSupportsV1DataTypes(withNormalizationVersion("0.3.0"))); - Assertions.assertTrue(NormalizationActivityImpl.normalizationSupportsV1DataTypes(withNormalizationVersion("0.4.1"))); - Assertions.assertTrue(NormalizationActivityImpl.normalizationSupportsV1DataTypes(withNormalizationVersion("dev"))); + Assertions.assertFalse(NormalizationActivityImpl.normalizationSupportsV1DataTypes(withNormalizationVersion("0.4.1"))); + Assertions.assertFalse(NormalizationActivityImpl.normalizationSupportsV1DataTypes(withNormalizationVersion("dev"))); Assertions.assertFalse(NormalizationActivityImpl.normalizationSupportsV1DataTypes(withNormalizationVersion("protocolv1"))); + Assertions.assertFalse(NormalizationActivityImpl.normalizationSupportsV1DataTypes(withNormalizationVersion("strict_comparison"))); + } + + @Test + void checkNormalizationTagReplacement() { + final FeatureFlags featureFlags = mock(FeatureFlags.class); + when(featureFlags.strictComparisonNormalizationTag()).thenReturn("strict_comparison"); + + final IntegrationLauncherConfig config = withNormalizationVersion("0.2.25"); + NormalizationActivityImpl.replaceNormalizationImageTag(config, "strict_comparison"); + assertEquals("normalization:strict_comparison", config.getNormalizationDockerImage()); } private IntegrationLauncherConfig withNormalizationVersion(final String version) { diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java index 9fc0bf684ef4..7c94965130a3 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java @@ -455,7 +455,7 @@ private static void verifyPersistState(final PersistStateActivity persistStateAc configuredCatalog); } - private static void verifyNormalize(final NormalizationActivity normalizationActivity, final NormalizationInput normalizationInput) { + private void verifyNormalize(final NormalizationActivity normalizationActivity, final NormalizationInput normalizationInput) { verify(normalizationActivity).normalize( JOB_RUN_CONFIG, DESTINATION_LAUNCHER_CONFIG, diff --git a/charts/airbyte-worker/templates/deployment.yaml b/charts/airbyte-worker/templates/deployment.yaml index af9517b02f69..e37bf9af02e3 100644 --- a/charts/airbyte-worker/templates/deployment.yaml +++ b/charts/airbyte-worker/templates/deployment.yaml @@ -310,6 +310,16 @@ spec: configMapKeyRef: name: {{ .Release.Name }}-airbyte-env key: USE_STREAM_CAPABLE_STATE + - name: STRICT_COMPARISON_NORMALIZATION_TAG + valueFrom: + configMapKeyRef: + name: {{ .Release.Name }}-airbyte-env + key: STRICT_COMPARISON_NORMALIZATION_TAG + - name: STRICT_COMPARISON_NORMALIZATION_WORKSPACES + valueFrom: + configMapKeyRef: + name: {{ .Release.Name }}-airbyte-env + key: STRICT_COMPARISON_NORMALIZATION_WORKSPACES - name: SHOULD_RUN_NOTIFY_WORKFLOWS valueFrom: configMapKeyRef: diff --git a/docker-compose.yaml b/docker-compose.yaml index 6392166bcd70..a8baa3c3ecbf 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -109,6 +109,8 @@ services: - MICRONAUT_ENVIRONMENTS=${WORKERS_MICRONAUT_ENVIRONMENTS} - APPLY_FIELD_SELECTION=${APPLY_FIELD_SELECTION} - FIELD_SELECTION_WORKSPACES=${FIELD_SELECTION_WORKSPACES} + - STRICT_COMPARISON_NORMALIZATION_WORKSPACES=${STRICT_COMPARISON_NORMALIZATION_WORKSPACES} + - STRICT_COMPARISON_NORMALIZATION_TAG=${STRICT_COMPARISON_NORMALIZATION_TAG} configs: - flags volumes: diff --git a/kube/resources/worker.yaml b/kube/resources/worker.yaml index d8f7be704011..38cf19fe268d 100644 --- a/kube/resources/worker.yaml +++ b/kube/resources/worker.yaml @@ -280,6 +280,16 @@ spec: configMapKeyRef: name: airbyte-env key: USE_STREAM_CAPABLE_STATE + - name: STRICT_COMPARISON_NORMALIZATION_TAG + valueFrom: + configMapKeyRef: + name: airbyte-env + key: STRICT_COMPARISON_NORMALIZATION_TAG + - name: STRICT_COMPARISON_NORMALIZATION_WORKSPACES + valueFrom: + configMapKeyRef: + name: airbyte-env + key: STRICT_COMPARISON_NORMALIZATION_WORKSPACES - name: SHOULD_RUN_NOTIFY_WORKFLOWS valueFrom: configMapKeyRef: