From 6b2b804950dd67e11332864d3336e1df333e6a24 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 6 Feb 2023 13:08:16 -0800 Subject: [PATCH 01/23] maybe add feature flag? --- .../features/EnvVariableFeatureFlags.java | 9 ++++++++ .../commons/features/FeatureFlagHelper.java | 22 ++++++++++++------- .../commons/features/FeatureFlags.java | 8 +++++++ .../temporal/sync/NormalizationActivity.java | 4 +++- .../sync/NormalizationActivityImpl.java | 21 ++++++++++++++++-- .../temporal/sync/SyncWorkflowImpl.java | 2 +- .../temporal/sync/SyncWorkflowTest.java | 17 +++++++++----- 7 files changed, 65 insertions(+), 18 deletions(-) diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java index b574d0a40cee..b864cee44a1a 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java @@ -22,6 +22,10 @@ public class EnvVariableFeatureFlags implements FeatureFlags { public static final String FIELD_SELECTION_WORKSPACES = "FIELD_SELECTION_WORKSPACES"; + public static final String APPLY_STRICT_COMPARISON_NORMALIZATION = "APPLY_STRICT_COMPARISON_NORMALIZATION"; + + public static final String STRICT_COMPARISON_NORMALIZATION_WORKSPACES = "STRICT_COMPARISON_NORMALIZATION_WORKSPACES"; + @Override public boolean autoDisablesFailingConnections() { log.info("Auto Disable Failing Connections: " + Boolean.parseBoolean(System.getenv("AUTO_DISABLE_FAILING_CONNECTIONS"))); @@ -64,6 +68,11 @@ public String fieldSelectionWorkspaces() { return getEnvOrDefault(FIELD_SELECTION_WORKSPACES, "", (arg) -> arg); } + @Override + public String strictComparisonNormalizationWorkspaces() { + return getEnvOrDefault(STRICT_COMPARISON_NORMALIZATION_WORKSPACES, "", (arg) -> arg); + } + // TODO: refactor in order to use the same method than the ones in EnvConfigs.java public T getEnvOrDefault(final String key, final T defaultValue, final Function parser) { final String value = System.getenv(key); diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlagHelper.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlagHelper.java index 1eeefa4828f9..99c839130cfd 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlagHelper.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlagHelper.java @@ -4,31 +4,37 @@ package io.airbyte.commons.features; +import com.google.common.annotations.VisibleForTesting; import java.util.HashSet; import java.util.Set; import java.util.UUID; +import java.util.function.Function; import lombok.extern.slf4j.Slf4j; @Slf4j public class FeatureFlagHelper { public static boolean isFieldSelectionEnabledForWorkspace(final FeatureFlags featureFlags, final UUID workspaceId) { - final String workspaceIdsString = featureFlags.fieldSelectionWorkspaces(); + return isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, workspaceId) || featureFlags.applyFieldSelection(); + } + + public static boolean isStrictComparisonNormalizationEnabledForWorkspace(final FeatureFlags featureFlags, final UUID workspaceId) { + return isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::strictComparisonNormalizationWorkspaces, workspaceId); + } + + @VisibleForTesting + static boolean isWorkspaceIncludedInFlag(final FeatureFlags featureFlags, final Function flagRetriever, final UUID workspaceId) { + final String workspaceIdsString = flagRetriever.apply(featureFlags); final Set workspaceIds = new HashSet<>(); if (workspaceIdsString != null && !workspaceIdsString.isEmpty()) { for (final String id : workspaceIdsString.split(",")) { try { workspaceIds.add(UUID.fromString(id)); } catch (final IllegalArgumentException e) { - log.warn("Malformed workspace id for field selection: {}", id); + log.warn("Malformed workspace id: {}", id); } } } - if (workspaceId != null && workspaceIds.contains(workspaceId)) { - return true; - } - - return featureFlags.applyFieldSelection(); + return workspaceId != null && workspaceIds.contains(workspaceId); } - } diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java index 60a40c89ded0..151ce381baac 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java @@ -37,4 +37,12 @@ public interface FeatureFlags { */ String fieldSelectionWorkspaces(); + /** + * Get the workspaces allow-listed for strict incremental comparison in normalization. + * This takes precedence over the normalization version in destination_definitions.yaml. + * + * @return a comma-separated list of workspace ids where strict incremental comparison should be enabled in normalization. + */ + String strictComparisonNormalizationWorkspaces(); + } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivity.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivity.java index 5c301d8fadcd..c11549207b3a 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivity.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivity.java @@ -12,6 +12,7 @@ import io.airbyte.persistence.job.models.JobRunConfig; import io.temporal.activity.ActivityInterface; import io.temporal.activity.ActivityMethod; +import java.util.UUID; @ActivityInterface public interface NormalizationActivity { @@ -19,7 +20,8 @@ public interface NormalizationActivity { @ActivityMethod NormalizationSummary normalize(JobRunConfig jobRunConfig, IntegrationLauncherConfig destinationLauncherConfig, - NormalizationInput input); + NormalizationInput input, + final UUID workspaceId); @ActivityMethod NormalizationInput generateNormalizationInput(final StandardSyncInput syncInput, final StandardSyncOutput syncOutput); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java index ddf1ca1c4ba0..bca424ba1249 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java @@ -13,6 +13,8 @@ import datadog.trace.api.Trace; import io.airbyte.api.client.AirbyteApiClient; import io.airbyte.api.client.model.generated.JobIdRequestBody; +import io.airbyte.commons.features.FeatureFlagHelper; +import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.functional.CheckedSupplier; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.protocol.migrations.v1.CatalogMigrationV1Helper; @@ -62,13 +64,15 @@ public class NormalizationActivityImpl implements NormalizationActivity { private final WorkerEnvironment workerEnvironment; private final LogConfigs logConfigs; private final String airbyteVersion; + private final FeatureFlags featureFlags; private final Integer serverPort; private final AirbyteConfigValidator airbyteConfigValidator; private final TemporalUtils temporalUtils; private final ResourceRequirements normalizationResourceRequirements; private final AirbyteApiClient airbyteApiClient; - private final static Version MINIMAL_VERSION_FOR_DATATYPES_V1 = new Version("0.3.0"); + private static final Version MINIMAL_VERSION_FOR_DATATYPES_V1 = new Version("0.3.0"); + private static final String STRICT_COMPARISON_IMAGE_TAG = "0.4.0"; public NormalizationActivityImpl(@Named("containerOrchestratorConfig") final Optional containerOrchestratorConfig, @Named("defaultWorkerConfigs") final WorkerConfigs workerConfigs, @@ -78,6 +82,7 @@ public NormalizationActivityImpl(@Named("containerOrchestratorConfig") final Opt final WorkerEnvironment workerEnvironment, final LogConfigs logConfigs, @Value("${airbyte.version}") final String airbyteVersion, + final FeatureFlags featureFlags, @Value("${micronaut.server.port}") final Integer serverPort, final AirbyteConfigValidator airbyteConfigValidator, final TemporalUtils temporalUtils, @@ -91,6 +96,7 @@ public NormalizationActivityImpl(@Named("containerOrchestratorConfig") final Opt this.workerEnvironment = workerEnvironment; this.logConfigs = logConfigs; this.airbyteVersion = airbyteVersion; + this.featureFlags = featureFlags; this.serverPort = serverPort; this.airbyteConfigValidator = airbyteConfigValidator; this.temporalUtils = temporalUtils; @@ -102,7 +108,8 @@ public NormalizationActivityImpl(@Named("containerOrchestratorConfig") final Opt @Override public NormalizationSummary normalize(final JobRunConfig jobRunConfig, final IntegrationLauncherConfig destinationLauncherConfig, - final NormalizationInput input) { + final NormalizationInput input, + final UUID workspaceId) { ApmTraceUtils.addTagsToTrace( Map.of(ATTEMPT_NUMBER_KEY, jobRunConfig.getAttemptId(), JOB_ID_KEY, jobRunConfig.getJobId(), DESTINATION_DOCKER_IMAGE_KEY, destinationLauncherConfig.getDockerImage())); @@ -111,6 +118,10 @@ public NormalizationSummary normalize(final JobRunConfig jobRunConfig, final var fullDestinationConfig = secretsHydrator.hydrate(input.getDestinationConfiguration()); final var fullInput = Jsons.clone(input).withDestinationConfiguration(fullDestinationConfig); + if (FeatureFlagHelper.isStrictComparisonNormalizationEnabledForWorkspace(featureFlags, workspaceId)) { + replaceNormalizationImageTag(destinationLauncherConfig, STRICT_COMPARISON_IMAGE_TAG); + } + // Check the version of normalization // We require at least version 0.3.0 to support data types v1. Using an older version would lead to // all columns being typed as JSONB. If normalization is using an older version, fallback to using @@ -178,6 +189,12 @@ static boolean normalizationSupportsV1DataTypes(final IntegrationLauncherConfig } } + private static void replaceNormalizationImageTag(final IntegrationLauncherConfig destinationLauncherConfig, final String newTag) { + final String[] imageComponents = destinationLauncherConfig.getNormalizationDockerImage().split(":", 2); + imageComponents[1] = newTag; + destinationLauncherConfig.setDockerImage(String.join(":", imageComponents)); + } + private CheckedSupplier, Exception> getLegacyWorkerFactory( final IntegrationLauncherConfig destinationLauncherConfig, final JobRunConfig jobRunConfig) { diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java index df395df38f64..2107668b319a 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java @@ -140,7 +140,7 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig, LOGGER.info("generating normalization input"); final NormalizationInput normalizationInput = generateNormalizationInput(syncInput, syncOutput); final NormalizationSummary normalizationSummary = - normalizationActivity.normalize(jobRunConfig, destinationLauncherConfig, normalizationInput); + normalizationActivity.normalize(jobRunConfig, destinationLauncherConfig, normalizationInput, syncInput.getWorkspaceId()); syncOutput = syncOutput.withNormalizationSummary(normalizationSummary); } else if (standardSyncOperation.getOperatorType() == OperatorType.DBT) { final OperatorDbtInput operatorDbtInput = new OperatorDbtInput() diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java index 5f816f76c040..1af9f1b85f69 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java @@ -231,7 +231,8 @@ void testSuccess() { doReturn(normalizationSummary).when(normalizationActivity).normalize( JOB_RUN_CONFIG, DESTINATION_LAUNCHER_CONFIG, - normalizationInput); + normalizationInput, + syncInput.getWorkspaceId()); final StandardSyncOutput actualOutput = execute(); @@ -276,7 +277,8 @@ void testReplicationFailedGracefully() { doReturn(normalizationSummary).when(normalizationActivity).normalize( JOB_RUN_CONFIG, DESTINATION_LAUNCHER_CONFIG, - normalizationInput); + normalizationInput, + syncInput.getWorkspaceId()); final StandardSyncOutput actualOutput = execute(); @@ -303,7 +305,8 @@ void testNormalizationFailure() { doThrow(new IllegalArgumentException("induced exception")).when(normalizationActivity).normalize( JOB_RUN_CONFIG, DESTINATION_LAUNCHER_CONFIG, - normalizationInput); + normalizationInput, + syncInput.getWorkspaceId()); assertThrows(WorkflowFailedException.class, this::execute); @@ -350,7 +353,8 @@ void testCancelDuringNormalization() { }).when(normalizationActivity).normalize( JOB_RUN_CONFIG, DESTINATION_LAUNCHER_CONFIG, - normalizationInput); + normalizationInput, + syncInput.getWorkspaceId()); assertThrows(WorkflowFailedException.class, this::execute); @@ -453,11 +457,12 @@ private static void verifyPersistState(final PersistStateActivity persistStateAc configuredCatalog); } - private static void verifyNormalize(final NormalizationActivity normalizationActivity, final NormalizationInput normalizationInput) { + private void verifyNormalize(final NormalizationActivity normalizationActivity, final NormalizationInput normalizationInput) { verify(normalizationActivity).normalize( JOB_RUN_CONFIG, DESTINATION_LAUNCHER_CONFIG, - normalizationInput); + normalizationInput, + syncInput.getWorkspaceId()); } private static void verifyDbtTransform(final DbtTransformationActivity dbtTransformationActivity, From 0264e35818fd704629936918f4b30043a6fdda4e Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 6 Feb 2023 13:16:41 -0800 Subject: [PATCH 02/23] pattern matching --- .../commons/temporal/sync/OrchestratorConstants.java | 1 + .../workers/process/AirbyteIntegrationLauncher.java | 1 + .../workers/process/AirbyteIntegrationLauncherTest.java | 1 + .../airbyte/commons/features/EnvVariableFeatureFlags.java | 2 -- .../src/main/java/io/airbyte/config/Configs.java | 2 ++ .../src/main/java/io/airbyte/config/EnvConfigs.java | 7 +++++++ .../config/ContainerOrchestratorConfigBeanFactory.java | 2 ++ docker-compose.yaml | 1 + 8 files changed, 15 insertions(+), 2 deletions(-) diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java index cf0a128d3c8a..414952d20564 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java @@ -73,6 +73,7 @@ public class OrchestratorConstants { EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, + EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES, FEATURE_FLAG_CLIENT, FEATURE_FLAG_PATH, EnvConfigs.LAUNCHDARKLY_KEY, diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java index 6ac2bea46e3d..e8e9224a2a39 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java @@ -230,6 +230,7 @@ private Map getWorkerMetadata() { EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(featureFlags.autoDetectSchema()), EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(featureFlags.applyFieldSelection()), EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, featureFlags.fieldSelectionWorkspaces(), + EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES, featureFlags.strictComparisonNormalizationWorkspaces(), EnvConfigs.SOCAT_KUBE_CPU_LIMIT, configs.getSocatSidecarKubeCpuLimit(), EnvConfigs.SOCAT_KUBE_CPU_REQUEST, configs.getSocatSidecarKubeCpuRequest(), EnvConfigs.LAUNCHDARKLY_KEY, configs.getLaunchDarklyKey()); diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java index 23f41fdb037b..4279779a9e08 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java @@ -63,6 +63,7 @@ class AirbyteIntegrationLauncherTest { EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(FEATURE_FLAGS.autoDetectSchema()), EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(FEATURE_FLAGS.applyFieldSelection()), EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, FEATURE_FLAGS.fieldSelectionWorkspaces(), + EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES, FEATURE_FLAGS.strictComparisonNormalizationWorkspaces(), EnvConfigs.SOCAT_KUBE_CPU_REQUEST, CONFIGS.getSocatSidecarKubeCpuRequest(), EnvConfigs.SOCAT_KUBE_CPU_LIMIT, CONFIGS.getSocatSidecarKubeCpuLimit(), EnvConfigs.LAUNCHDARKLY_KEY, CONFIGS.getLaunchDarklyKey()); diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java index b864cee44a1a..363e79c5d313 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java @@ -22,8 +22,6 @@ public class EnvVariableFeatureFlags implements FeatureFlags { public static final String FIELD_SELECTION_WORKSPACES = "FIELD_SELECTION_WORKSPACES"; - public static final String APPLY_STRICT_COMPARISON_NORMALIZATION = "APPLY_STRICT_COMPARISON_NORMALIZATION"; - public static final String STRICT_COMPARISON_NORMALIZATION_WORKSPACES = "STRICT_COMPARISON_NORMALIZATION_WORKSPACES"; @Override diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java index 7679e9c23cf8..8d238629af85 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java @@ -751,6 +751,8 @@ public interface Configs { String getFieldSelectionWorkspaces(); + String getStrictComparisonNormalizationWorkspaces(); + enum TrackingStrategy { SEGMENT, LOGGING diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java index 6e0c3d81ca95..7de8e3c39942 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -224,6 +224,8 @@ public class EnvConfigs implements Configs { private static final String APPLY_FIELD_SELECTION = "APPLY_FIELD_SELECTION"; private static final String FIELD_SELECTION_WORKSPACES = "FIELD_SELECTION_WORKSPACES"; + private static final String STRICT_COMPARISON_NORMALIZATION_WORKSPACES = "STRICT_COMPARISON_NORMALIZATION_WORKSPACES"; + public static final Map> JOB_SHARED_ENVS = Map.of( AIRBYTE_VERSION, (instance) -> instance.getAirbyteVersion().serialize(), AIRBYTE_ROLE, EnvConfigs::getAirbyteRole, @@ -1145,6 +1147,11 @@ public String getFieldSelectionWorkspaces() { return getEnvOrDefault(FIELD_SELECTION_WORKSPACES, ""); } + @Override + public String getStrictComparisonNormalizationWorkspaces() { + return getEnvOrDefault(STRICT_COMPARISON_NORMALIZATION_WORKSPACES, ""); + } + @Override public int getActivityNumberOfAttempt() { return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_ATTEMPT, "5")); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java index 346db8ead506..d84375c47f67 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java @@ -96,6 +96,8 @@ public ContainerOrchestratorConfig kubernetesContainerOrchestratorConfig( environmentVariables.put(EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, Boolean.toString(featureFlags.autoDetectSchema())); environmentVariables.put(EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, Boolean.toString(featureFlags.applyFieldSelection())); environmentVariables.put(EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, featureFlags.fieldSelectionWorkspaces()); + environmentVariables.put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES, + featureFlags.strictComparisonNormalizationWorkspaces()); environmentVariables.put(JAVA_OPTS_ENV_VAR, containerOrchestratorJavaOpts); environmentVariables.put(CONTROL_PLANE_AUTH_ENDPOINT_ENV_VAR, controlPlaneAuthEndpoint); environmentVariables.put(DATA_PLANE_SERVICE_ACCOUNT_CREDENTIALS_PATH_ENV_VAR, dataPlaneServiceAccountCredentialsPath); diff --git a/docker-compose.yaml b/docker-compose.yaml index 6392166bcd70..2e8e6efc6b85 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -109,6 +109,7 @@ services: - MICRONAUT_ENVIRONMENTS=${WORKERS_MICRONAUT_ENVIRONMENTS} - APPLY_FIELD_SELECTION=${APPLY_FIELD_SELECTION} - FIELD_SELECTION_WORKSPACES=${FIELD_SELECTION_WORKSPACES} + - STRICT_COMPARISON_NORMALIZATION_WORKSPACES=${STRICT_COMPARISON_NORMALIZATION_WORKSPACES} configs: - flags volumes: From 2359f24821a7de3aa602fa75e1d1218ead9c906b Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 6 Feb 2023 13:24:33 -0800 Subject: [PATCH 03/23] also add version check --- .../temporal/sync/NormalizationActivityImpl.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java index bca424ba1249..5056f4a3b5aa 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java @@ -118,7 +118,8 @@ public NormalizationSummary normalize(final JobRunConfig jobRunConfig, final var fullDestinationConfig = secretsHydrator.hydrate(input.getDestinationConfiguration()); final var fullInput = Jsons.clone(input).withDestinationConfiguration(fullDestinationConfig); - if (FeatureFlagHelper.isStrictComparisonNormalizationEnabledForWorkspace(featureFlags, workspaceId)) { + // Strict comparison was branched from normalization 0.2.25, so we shouldn't apply it if we're trying to use a newer version of normalization + if (FeatureFlagHelper.isStrictComparisonNormalizationEnabledForWorkspace(featureFlags, workspaceId) && "0.2.25".equals(getNormalizationImageTag(destinationLauncherConfig))) { replaceNormalizationImageTag(destinationLauncherConfig, STRICT_COMPARISON_IMAGE_TAG); } @@ -179,8 +180,7 @@ public NormalizationInput generateNormalizationInput(final StandardSyncInput syn @VisibleForTesting static boolean normalizationSupportsV1DataTypes(final IntegrationLauncherConfig destinationLauncherConfig) { try { - final String[] normalizationImage = destinationLauncherConfig.getNormalizationDockerImage().split(":", 2); - final Version normalizationVersion = new Version(normalizationImage[1]); + final Version normalizationVersion = new Version(getNormalizationImageTag(destinationLauncherConfig)); return normalizationVersion.greaterThanOrEqualTo(MINIMAL_VERSION_FOR_DATATYPES_V1); } catch (final IllegalArgumentException e) { // IllegalArgument here means that the version isn't in a semver format. @@ -189,6 +189,10 @@ static boolean normalizationSupportsV1DataTypes(final IntegrationLauncherConfig } } + private static String getNormalizationImageTag(final IntegrationLauncherConfig destinationLauncherConfig) { + return destinationLauncherConfig.getNormalizationDockerImage().split(":", 2)[1]; + } + private static void replaceNormalizationImageTag(final IntegrationLauncherConfig destinationLauncherConfig, final String newTag) { final String[] imageComponents = destinationLauncherConfig.getNormalizationDockerImage().split(":", 2); imageComponents[1] = newTag; From 91073d74d0ed9060509392ea5eeb1e5f26b3ed99 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 6 Feb 2023 13:46:53 -0800 Subject: [PATCH 04/23] formatting --- .../io/airbyte/commons/features/FeatureFlagHelper.java | 5 ++++- .../java/io/airbyte/commons/features/FeatureFlags.java | 7 ++++--- .../workers/temporal/sync/NormalizationActivityImpl.java | 6 ++++-- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlagHelper.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlagHelper.java index 99c839130cfd..dbef37914b0d 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlagHelper.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlagHelper.java @@ -23,7 +23,9 @@ public static boolean isStrictComparisonNormalizationEnabledForWorkspace(final F } @VisibleForTesting - static boolean isWorkspaceIncludedInFlag(final FeatureFlags featureFlags, final Function flagRetriever, final UUID workspaceId) { + static boolean isWorkspaceIncludedInFlag(final FeatureFlags featureFlags, + final Function flagRetriever, + final UUID workspaceId) { final String workspaceIdsString = flagRetriever.apply(featureFlags); final Set workspaceIds = new HashSet<>(); if (workspaceIdsString != null && !workspaceIdsString.isEmpty()) { @@ -37,4 +39,5 @@ static boolean isWorkspaceIncludedInFlag(final FeatureFlags featureFlags, final } return workspaceId != null && workspaceIds.contains(workspaceId); } + } diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java index 151ce381baac..98ef0a601f44 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java @@ -38,10 +38,11 @@ public interface FeatureFlags { String fieldSelectionWorkspaces(); /** - * Get the workspaces allow-listed for strict incremental comparison in normalization. - * This takes precedence over the normalization version in destination_definitions.yaml. + * Get the workspaces allow-listed for strict incremental comparison in normalization. This takes + * precedence over the normalization version in destination_definitions.yaml. * - * @return a comma-separated list of workspace ids where strict incremental comparison should be enabled in normalization. + * @return a comma-separated list of workspace ids where strict incremental comparison should be + * enabled in normalization. */ String strictComparisonNormalizationWorkspaces(); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java index 5056f4a3b5aa..ed667262e826 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java @@ -118,8 +118,10 @@ public NormalizationSummary normalize(final JobRunConfig jobRunConfig, final var fullDestinationConfig = secretsHydrator.hydrate(input.getDestinationConfiguration()); final var fullInput = Jsons.clone(input).withDestinationConfiguration(fullDestinationConfig); - // Strict comparison was branched from normalization 0.2.25, so we shouldn't apply it if we're trying to use a newer version of normalization - if (FeatureFlagHelper.isStrictComparisonNormalizationEnabledForWorkspace(featureFlags, workspaceId) && "0.2.25".equals(getNormalizationImageTag(destinationLauncherConfig))) { + // Strict comparison was branched from normalization 0.2.25, so we shouldn't apply it if we're + // trying to use a newer version of normalization + if (FeatureFlagHelper.isStrictComparisonNormalizationEnabledForWorkspace(featureFlags, workspaceId) + && "0.2.25".equals(getNormalizationImageTag(destinationLauncherConfig))) { replaceNormalizationImageTag(destinationLauncherConfig, STRICT_COMPARISON_IMAGE_TAG); } From 060fab06b3dd93bb4f3842a2a371392673201736 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 6 Feb 2023 13:50:27 -0800 Subject: [PATCH 05/23] refactor test also --- .../commons/features/FeatureFlagHelperTest.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/airbyte-commons/src/test/java/io/airbyte/commons/features/FeatureFlagHelperTest.java b/airbyte-commons/src/test/java/io/airbyte/commons/features/FeatureFlagHelperTest.java index 2b82701f8e0e..3702adbbef8b 100644 --- a/airbyte-commons/src/test/java/io/airbyte/commons/features/FeatureFlagHelperTest.java +++ b/airbyte-commons/src/test/java/io/airbyte/commons/features/FeatureFlagHelperTest.java @@ -26,30 +26,30 @@ void beforeEach() { void isFieldSelectionEnabledForWorkspaceWithEmptyString() { when(featureFlags.fieldSelectionWorkspaces()).thenReturn(""); - assertFalse(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, UUID.randomUUID())); + assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, UUID.randomUUID())); } @Test void isFieldSelectionEnabledForWorkspaceWithSpaceString() { when(featureFlags.fieldSelectionWorkspaces()).thenReturn(" "); - assertFalse(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, UUID.randomUUID())); + assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, UUID.randomUUID())); } @Test void isFieldSelectionEnabledForWorkspaceWithNullString() { when(featureFlags.fieldSelectionWorkspaces()).thenReturn(null); - assertFalse(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, UUID.randomUUID())); + assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, UUID.randomUUID())); } @Test void isFieldSelectionEnabledForWorkspaceWithSomeIdsAndAMatch() { final UUID workspaceId = UUID.randomUUID(); final UUID randomId = UUID.randomUUID(); - when(featureFlags.fieldSelectionWorkspaces()).thenReturn(randomId.toString() + "," + workspaceId.toString()); + when(featureFlags.fieldSelectionWorkspaces()).thenReturn(randomId + "," + workspaceId); - assertTrue(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, workspaceId)); + assertTrue(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, workspaceId)); } @Test @@ -57,9 +57,9 @@ void isFieldSelectionEnabledForWorkspaceWithSomeIdsAndNoMatch() { final UUID workspaceId = UUID.randomUUID(); final UUID randomId1 = UUID.randomUUID(); final UUID randomId2 = UUID.randomUUID(); - when(featureFlags.fieldSelectionWorkspaces()).thenReturn(randomId1.toString() + "," + randomId2.toString()); + when(featureFlags.fieldSelectionWorkspaces()).thenReturn(randomId1 + "," + randomId2); - assertFalse(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, workspaceId)); + assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, workspaceId)); } } From 6bb760a35041f18f88796d37a4246094c3fdad5e Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 6 Feb 2023 14:00:51 -0800 Subject: [PATCH 06/23] extract test + fix method call --- .../sync/NormalizationActivityImpl.java | 18 +++++++++++++----- .../sync/NormalizationActivityImplTest.java | 13 +++++++++++++ 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java index ed667262e826..d4fbf78681a5 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java @@ -73,6 +73,7 @@ public class NormalizationActivityImpl implements NormalizationActivity { private static final Version MINIMAL_VERSION_FOR_DATATYPES_V1 = new Version("0.3.0"); private static final String STRICT_COMPARISON_IMAGE_TAG = "0.4.0"; + private static final String NON_STRICT_COMPARISON_IMAGE_TAG = "0.2.25"; public NormalizationActivityImpl(@Named("containerOrchestratorConfig") final Optional containerOrchestratorConfig, @Named("defaultWorkerConfigs") final WorkerConfigs workerConfigs, @@ -120,9 +121,8 @@ public NormalizationSummary normalize(final JobRunConfig jobRunConfig, // Strict comparison was branched from normalization 0.2.25, so we shouldn't apply it if we're // trying to use a newer version of normalization - if (FeatureFlagHelper.isStrictComparisonNormalizationEnabledForWorkspace(featureFlags, workspaceId) - && "0.2.25".equals(getNormalizationImageTag(destinationLauncherConfig))) { - replaceNormalizationImageTag(destinationLauncherConfig, STRICT_COMPARISON_IMAGE_TAG); + if (FeatureFlagHelper.isStrictComparisonNormalizationEnabledForWorkspace(featureFlags, workspaceId)) { + activateStrictNormalizationComparisonIfPossible(destinationLauncherConfig); } // Check the version of normalization @@ -170,6 +170,13 @@ public NormalizationSummary normalize(final JobRunConfig jobRunConfig, () -> context); } + @VisibleForTesting + static void activateStrictNormalizationComparisonIfPossible(final IntegrationLauncherConfig destinationLauncherConfig) { + if (NON_STRICT_COMPARISON_IMAGE_TAG.equals(getNormalizationImageTag(destinationLauncherConfig))) { + replaceNormalizationImageTag(destinationLauncherConfig, STRICT_COMPARISON_IMAGE_TAG); + } + } + @Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME) @Override public NormalizationInput generateNormalizationInput(final StandardSyncInput syncInput, final StandardSyncOutput syncOutput) { @@ -195,10 +202,11 @@ private static String getNormalizationImageTag(final IntegrationLauncherConfig d return destinationLauncherConfig.getNormalizationDockerImage().split(":", 2)[1]; } - private static void replaceNormalizationImageTag(final IntegrationLauncherConfig destinationLauncherConfig, final String newTag) { + @VisibleForTesting + static void replaceNormalizationImageTag(final IntegrationLauncherConfig destinationLauncherConfig, final String newTag) { final String[] imageComponents = destinationLauncherConfig.getNormalizationDockerImage().split(":", 2); imageComponents[1] = newTag; - destinationLauncherConfig.setDockerImage(String.join(":", imageComponents)); + destinationLauncherConfig.setNormalizationDockerImage(String.join(":", imageComponents)); } private CheckedSupplier, Exception> getLegacyWorkerFactory( diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/NormalizationActivityImplTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/NormalizationActivityImplTest.java index aff7afb1ab57..a167758ca457 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/NormalizationActivityImplTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/NormalizationActivityImplTest.java @@ -4,6 +4,8 @@ package io.airbyte.workers.temporal.sync; +import static org.junit.jupiter.api.Assertions.assertEquals; + import io.airbyte.persistence.job.models.IntegrationLauncherConfig; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -20,6 +22,17 @@ void checkNormalizationDataTypesSupportFromVersionString() { Assertions.assertFalse(NormalizationActivityImpl.normalizationSupportsV1DataTypes(withNormalizationVersion("protocolv1"))); } + @Test + void checkNormalizationTagReplacement() { + final IntegrationLauncherConfig config1 = withNormalizationVersion("0.2.25"); + NormalizationActivityImpl.activateStrictNormalizationComparisonIfPossible(config1); + assertEquals("normalization:0.4.0", config1.getNormalizationDockerImage()); + + final IntegrationLauncherConfig config2 = withNormalizationVersion("0.2.26"); + NormalizationActivityImpl.activateStrictNormalizationComparisonIfPossible(config2); + assertEquals("normalization:0.2.26", config2.getNormalizationDockerImage()); + } + private IntegrationLauncherConfig withNormalizationVersion(final String version) { return new IntegrationLauncherConfig() .withNormalizationDockerImage("normalization:" + version); From ae0b7e36b17dd0a086d517a90f42abc6795f4be7 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 6 Feb 2023 14:07:15 -0800 Subject: [PATCH 07/23] minor tweaks --- .../io/airbyte/commons/features/FeatureFlagHelperTest.java | 2 +- .../workers/temporal/sync/NormalizationActivityImpl.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte-commons/src/test/java/io/airbyte/commons/features/FeatureFlagHelperTest.java b/airbyte-commons/src/test/java/io/airbyte/commons/features/FeatureFlagHelperTest.java index 3702adbbef8b..393ac11fb5a0 100644 --- a/airbyte-commons/src/test/java/io/airbyte/commons/features/FeatureFlagHelperTest.java +++ b/airbyte-commons/src/test/java/io/airbyte/commons/features/FeatureFlagHelperTest.java @@ -47,7 +47,7 @@ void isFieldSelectionEnabledForWorkspaceWithNullString() { void isFieldSelectionEnabledForWorkspaceWithSomeIdsAndAMatch() { final UUID workspaceId = UUID.randomUUID(); final UUID randomId = UUID.randomUUID(); - when(featureFlags.fieldSelectionWorkspaces()).thenReturn(randomId + "," + workspaceId); + when(featureFlags.fieldSelectionWorkspaces()).thenReturn(randomId + "," + workspaceId); assertTrue(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, workspaceId)); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java index d4fbf78681a5..ba8100d0193c 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java @@ -119,8 +119,6 @@ public NormalizationSummary normalize(final JobRunConfig jobRunConfig, final var fullDestinationConfig = secretsHydrator.hydrate(input.getDestinationConfiguration()); final var fullInput = Jsons.clone(input).withDestinationConfiguration(fullDestinationConfig); - // Strict comparison was branched from normalization 0.2.25, so we shouldn't apply it if we're - // trying to use a newer version of normalization if (FeatureFlagHelper.isStrictComparisonNormalizationEnabledForWorkspace(featureFlags, workspaceId)) { activateStrictNormalizationComparisonIfPossible(destinationLauncherConfig); } @@ -172,6 +170,8 @@ public NormalizationSummary normalize(final JobRunConfig jobRunConfig, @VisibleForTesting static void activateStrictNormalizationComparisonIfPossible(final IntegrationLauncherConfig destinationLauncherConfig) { + // Strict comparison was branched from normalization 0.2.25, so we shouldn't apply it if we're + // trying to use a newer version of normalization if (NON_STRICT_COMPARISON_IMAGE_TAG.equals(getNormalizationImageTag(destinationLauncherConfig))) { replaceNormalizationImageTag(destinationLauncherConfig, STRICT_COMPARISON_IMAGE_TAG); } From 4bb6a1fcb71ddc06227fe267897bf35b3ba388e4 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 6 Feb 2023 14:51:33 -0800 Subject: [PATCH 08/23] add context to log message --- .../airbyte/commons/features/FeatureFlagHelper.java | 11 +++++++---- .../commons/features/FeatureFlagHelperTest.java | 10 +++++----- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlagHelper.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlagHelper.java index dbef37914b0d..a4ff90ddae24 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlagHelper.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlagHelper.java @@ -15,17 +15,20 @@ public class FeatureFlagHelper { public static boolean isFieldSelectionEnabledForWorkspace(final FeatureFlags featureFlags, final UUID workspaceId) { - return isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, workspaceId) || featureFlags.applyFieldSelection(); + return isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, workspaceId, "field selection") + || featureFlags.applyFieldSelection(); } public static boolean isStrictComparisonNormalizationEnabledForWorkspace(final FeatureFlags featureFlags, final UUID workspaceId) { - return isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::strictComparisonNormalizationWorkspaces, workspaceId); + return isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::strictComparisonNormalizationWorkspaces, workspaceId, + "strict comparison in normalization"); } @VisibleForTesting static boolean isWorkspaceIncludedInFlag(final FeatureFlags featureFlags, final Function flagRetriever, - final UUID workspaceId) { + final UUID workspaceId, + final String context) { final String workspaceIdsString = flagRetriever.apply(featureFlags); final Set workspaceIds = new HashSet<>(); if (workspaceIdsString != null && !workspaceIdsString.isEmpty()) { @@ -33,7 +36,7 @@ static boolean isWorkspaceIncludedInFlag(final FeatureFlags featureFlags, try { workspaceIds.add(UUID.fromString(id)); } catch (final IllegalArgumentException e) { - log.warn("Malformed workspace id: {}", id); + log.warn("Malformed workspace id for {}: {}", context, id); } } } diff --git a/airbyte-commons/src/test/java/io/airbyte/commons/features/FeatureFlagHelperTest.java b/airbyte-commons/src/test/java/io/airbyte/commons/features/FeatureFlagHelperTest.java index 393ac11fb5a0..7846e1e8aea5 100644 --- a/airbyte-commons/src/test/java/io/airbyte/commons/features/FeatureFlagHelperTest.java +++ b/airbyte-commons/src/test/java/io/airbyte/commons/features/FeatureFlagHelperTest.java @@ -26,21 +26,21 @@ void beforeEach() { void isFieldSelectionEnabledForWorkspaceWithEmptyString() { when(featureFlags.fieldSelectionWorkspaces()).thenReturn(""); - assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, UUID.randomUUID())); + assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, UUID.randomUUID(), null)); } @Test void isFieldSelectionEnabledForWorkspaceWithSpaceString() { when(featureFlags.fieldSelectionWorkspaces()).thenReturn(" "); - assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, UUID.randomUUID())); + assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, UUID.randomUUID(), null)); } @Test void isFieldSelectionEnabledForWorkspaceWithNullString() { when(featureFlags.fieldSelectionWorkspaces()).thenReturn(null); - assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, UUID.randomUUID())); + assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, UUID.randomUUID(), null)); } @Test @@ -49,7 +49,7 @@ void isFieldSelectionEnabledForWorkspaceWithSomeIdsAndAMatch() { final UUID randomId = UUID.randomUUID(); when(featureFlags.fieldSelectionWorkspaces()).thenReturn(randomId + "," + workspaceId); - assertTrue(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, workspaceId)); + assertTrue(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, workspaceId, null)); } @Test @@ -59,7 +59,7 @@ void isFieldSelectionEnabledForWorkspaceWithSomeIdsAndNoMatch() { final UUID randomId2 = UUID.randomUUID(); when(featureFlags.fieldSelectionWorkspaces()).thenReturn(randomId1 + "," + randomId2); - assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, workspaceId)); + assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, workspaceId, null)); } } From 679c0dc026f49efc8078e7be8033202f43a34ba5 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 6 Feb 2023 14:53:35 -0800 Subject: [PATCH 09/23] put workspace id in normalization input --- .../main/resources/types/NormalizationInput.yaml | 4 ++++ .../temporal/sync/NormalizationActivity.java | 3 +-- .../temporal/sync/NormalizationActivityImpl.java | 12 +++++++----- .../workers/temporal/sync/SyncWorkflowImpl.java | 2 +- .../workers/temporal/sync/SyncWorkflowTest.java | 15 +++++---------- 5 files changed, 18 insertions(+), 18 deletions(-) diff --git a/airbyte-config/config-models/src/main/resources/types/NormalizationInput.yaml b/airbyte-config/config-models/src/main/resources/types/NormalizationInput.yaml index 584141e6e14f..72fd7ffbcdb8 100644 --- a/airbyte-config/config-models/src/main/resources/types/NormalizationInput.yaml +++ b/airbyte-config/config-models/src/main/resources/types/NormalizationInput.yaml @@ -21,3 +21,7 @@ properties: type: object description: optional resource requirements to run sync workers existingJavaType: io.airbyte.config.ResourceRequirements + workspaceId: + description: The id of the workspace associated with this sync + type: string + format: uuid diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivity.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivity.java index c11549207b3a..25516f09e513 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivity.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivity.java @@ -20,8 +20,7 @@ public interface NormalizationActivity { @ActivityMethod NormalizationSummary normalize(JobRunConfig jobRunConfig, IntegrationLauncherConfig destinationLauncherConfig, - NormalizationInput input, - final UUID workspaceId); + NormalizationInput input); @ActivityMethod NormalizationInput generateNormalizationInput(final StandardSyncInput syncInput, final StandardSyncOutput syncOutput); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java index ba8100d0193c..e4e8a1234443 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java @@ -71,7 +71,9 @@ public class NormalizationActivityImpl implements NormalizationActivity { private final ResourceRequirements normalizationResourceRequirements; private final AirbyteApiClient airbyteApiClient; + // This constant is not currently in use. We'll need to bump it when we try releasing v1 again. private static final Version MINIMAL_VERSION_FOR_DATATYPES_V1 = new Version("0.3.0"); + private static final String V1_NORMALIZATION_MINOR_VERSION = "3"; private static final String STRICT_COMPARISON_IMAGE_TAG = "0.4.0"; private static final String NON_STRICT_COMPARISON_IMAGE_TAG = "0.2.25"; @@ -109,8 +111,7 @@ public NormalizationActivityImpl(@Named("containerOrchestratorConfig") final Opt @Override public NormalizationSummary normalize(final JobRunConfig jobRunConfig, final IntegrationLauncherConfig destinationLauncherConfig, - final NormalizationInput input, - final UUID workspaceId) { + final NormalizationInput input) { ApmTraceUtils.addTagsToTrace( Map.of(ATTEMPT_NUMBER_KEY, jobRunConfig.getAttemptId(), JOB_ID_KEY, jobRunConfig.getJobId(), DESTINATION_DOCKER_IMAGE_KEY, destinationLauncherConfig.getDockerImage())); @@ -119,7 +120,7 @@ public NormalizationSummary normalize(final JobRunConfig jobRunConfig, final var fullDestinationConfig = secretsHydrator.hydrate(input.getDestinationConfiguration()); final var fullInput = Jsons.clone(input).withDestinationConfiguration(fullDestinationConfig); - if (FeatureFlagHelper.isStrictComparisonNormalizationEnabledForWorkspace(featureFlags, workspaceId)) { + if (FeatureFlagHelper.isStrictComparisonNormalizationEnabledForWorkspace(featureFlags, input.getWorkspaceId())) { activateStrictNormalizationComparisonIfPossible(destinationLauncherConfig); } @@ -183,14 +184,15 @@ public NormalizationInput generateNormalizationInput(final StandardSyncInput syn return new NormalizationInput() .withDestinationConfiguration(syncInput.getDestinationConfiguration()) .withCatalog(syncOutput.getOutputCatalog()) - .withResourceRequirements(normalizationResourceRequirements); + .withResourceRequirements(normalizationResourceRequirements) + .withWorkspaceId(syncInput.getWorkspaceId()); } @VisibleForTesting static boolean normalizationSupportsV1DataTypes(final IntegrationLauncherConfig destinationLauncherConfig) { try { final Version normalizationVersion = new Version(getNormalizationImageTag(destinationLauncherConfig)); - return normalizationVersion.greaterThanOrEqualTo(MINIMAL_VERSION_FOR_DATATYPES_V1); + return V1_NORMALIZATION_MINOR_VERSION.equals(normalizationVersion.getMinorVersion()); } catch (final IllegalArgumentException e) { // IllegalArgument here means that the version isn't in a semver format. // The current behavior is to assume it supports v0 data types for dev purposes. diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java index 2107668b319a..df395df38f64 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java @@ -140,7 +140,7 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig, LOGGER.info("generating normalization input"); final NormalizationInput normalizationInput = generateNormalizationInput(syncInput, syncOutput); final NormalizationSummary normalizationSummary = - normalizationActivity.normalize(jobRunConfig, destinationLauncherConfig, normalizationInput, syncInput.getWorkspaceId()); + normalizationActivity.normalize(jobRunConfig, destinationLauncherConfig, normalizationInput); syncOutput = syncOutput.withNormalizationSummary(normalizationSummary); } else if (standardSyncOperation.getOperatorType() == OperatorType.DBT) { final OperatorDbtInput operatorDbtInput = new OperatorDbtInput() diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java index 1af9f1b85f69..f48d5b5f3b07 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java @@ -231,8 +231,7 @@ void testSuccess() { doReturn(normalizationSummary).when(normalizationActivity).normalize( JOB_RUN_CONFIG, DESTINATION_LAUNCHER_CONFIG, - normalizationInput, - syncInput.getWorkspaceId()); + normalizationInput); final StandardSyncOutput actualOutput = execute(); @@ -277,8 +276,7 @@ void testReplicationFailedGracefully() { doReturn(normalizationSummary).when(normalizationActivity).normalize( JOB_RUN_CONFIG, DESTINATION_LAUNCHER_CONFIG, - normalizationInput, - syncInput.getWorkspaceId()); + normalizationInput); final StandardSyncOutput actualOutput = execute(); @@ -305,8 +303,7 @@ void testNormalizationFailure() { doThrow(new IllegalArgumentException("induced exception")).when(normalizationActivity).normalize( JOB_RUN_CONFIG, DESTINATION_LAUNCHER_CONFIG, - normalizationInput, - syncInput.getWorkspaceId()); + normalizationInput); assertThrows(WorkflowFailedException.class, this::execute); @@ -353,8 +350,7 @@ void testCancelDuringNormalization() { }).when(normalizationActivity).normalize( JOB_RUN_CONFIG, DESTINATION_LAUNCHER_CONFIG, - normalizationInput, - syncInput.getWorkspaceId()); + normalizationInput); assertThrows(WorkflowFailedException.class, this::execute); @@ -461,8 +457,7 @@ private void verifyNormalize(final NormalizationActivity normalizationActivity, verify(normalizationActivity).normalize( JOB_RUN_CONFIG, DESTINATION_LAUNCHER_CONFIG, - normalizationInput, - syncInput.getWorkspaceId()); + normalizationInput); } private static void verifyDbtTransform(final DbtTransformationActivity dbtTransformationActivity, From b5cff45b3d262ed16e59db08b968535d8e18aca6 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 6 Feb 2023 14:54:04 -0800 Subject: [PATCH 10/23] use non-semver tag --- .../workers/temporal/sync/NormalizationActivityImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java index e4e8a1234443..1dc225d3c508 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java @@ -74,7 +74,7 @@ public class NormalizationActivityImpl implements NormalizationActivity { // This constant is not currently in use. We'll need to bump it when we try releasing v1 again. private static final Version MINIMAL_VERSION_FOR_DATATYPES_V1 = new Version("0.3.0"); private static final String V1_NORMALIZATION_MINOR_VERSION = "3"; - private static final String STRICT_COMPARISON_IMAGE_TAG = "0.4.0"; + private static final String STRICT_COMPARISON_IMAGE_TAG = "strict_comparison"; private static final String NON_STRICT_COMPARISON_IMAGE_TAG = "0.2.25"; public NormalizationActivityImpl(@Named("containerOrchestratorConfig") final Optional containerOrchestratorConfig, From 5b8feec06d2fc88483b85c138b13f2572a6d852e Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 6 Feb 2023 16:11:18 -0800 Subject: [PATCH 11/23] add flag for version of normalization --- .../temporal/sync/OrchestratorConstants.java | 1 + .../process/AirbyteIntegrationLauncher.java | 29 +++++++++++-------- .../AirbyteIntegrationLauncherTest.java | 28 ++++++++++-------- .../features/EnvVariableFeatureFlags.java | 6 ++++ .../commons/features/FeatureFlags.java | 5 ++++ .../main/java/io/airbyte/config/Configs.java | 2 ++ .../java/io/airbyte/config/EnvConfigs.java | 6 ++++ ...ontainerOrchestratorConfigBeanFactory.java | 1 + .../temporal/sync/NormalizationActivity.java | 1 - .../sync/NormalizationActivityImpl.java | 7 ++--- .../sync/NormalizationActivityImplTest.java | 17 +++++++---- docker-compose.yaml | 1 + 12 files changed, 70 insertions(+), 34 deletions(-) diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java index 414952d20564..6741b2b79748 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java @@ -74,6 +74,7 @@ public class OrchestratorConstants { EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES, + EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_TAG, FEATURE_FLAG_CLIENT, FEATURE_FLAG_PATH, EnvConfigs.LAUNCHDARKLY_KEY, diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java index e8e9224a2a39..f200f416e95c 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java @@ -20,6 +20,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import datadog.trace.api.Trace; import io.airbyte.commons.features.EnvVariableFeatureFlags; import io.airbyte.commons.features.FeatureFlags; @@ -222,18 +223,22 @@ public Process write(final Path jobRoot, private Map getWorkerMetadata() { final Configs configs = new EnvConfigs(); - return Map.of( - WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, imageName, - WorkerEnvConstants.WORKER_JOB_ID, jobId, - WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(attempt), - EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(featureFlags.useStreamCapableState()), - EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(featureFlags.autoDetectSchema()), - EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(featureFlags.applyFieldSelection()), - EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, featureFlags.fieldSelectionWorkspaces(), - EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES, featureFlags.strictComparisonNormalizationWorkspaces(), - EnvConfigs.SOCAT_KUBE_CPU_LIMIT, configs.getSocatSidecarKubeCpuLimit(), - EnvConfigs.SOCAT_KUBE_CPU_REQUEST, configs.getSocatSidecarKubeCpuRequest(), - EnvConfigs.LAUNCHDARKLY_KEY, configs.getLaunchDarklyKey()); + // We've managed to exceed the maximum number of parameters for Map.of(), so use a builder + convert back to hashmap + return Maps.newHashMap( + ImmutableMap.builder() + .put(WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, imageName) + .put(WorkerEnvConstants.WORKER_JOB_ID, jobId) + .put(WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(attempt)) + .put(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(featureFlags.useStreamCapableState())) + .put(EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(featureFlags.autoDetectSchema())) + .put(EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(featureFlags.applyFieldSelection())) + .put(EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, featureFlags.fieldSelectionWorkspaces()) + .put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES, featureFlags.strictComparisonNormalizationWorkspaces()) + .put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_TAG, featureFlags.strictComparisonNormalizationWorkspaces()) + .put(EnvConfigs.SOCAT_KUBE_CPU_LIMIT, configs.getSocatSidecarKubeCpuLimit()) + .put(EnvConfigs.SOCAT_KUBE_CPU_REQUEST, configs.getSocatSidecarKubeCpuRequest()) + .put(EnvConfigs.LAUNCHDARKLY_KEY, configs.getLaunchDarklyKey()) + .build()); } } diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java index 4279779a9e08..feb2fbabe2dd 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import io.airbyte.commons.features.EnvVariableFeatureFlags; import io.airbyte.commons.features.FeatureFlags; import io.airbyte.config.Configs; @@ -55,18 +56,21 @@ class AirbyteIntegrationLauncherTest { private static final FeatureFlags FEATURE_FLAGS = new EnvVariableFeatureFlags(); private static final Configs CONFIGS = new EnvConfigs(); - private static final Map JOB_METADATA = Map.of( - WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, FAKE_IMAGE, - WorkerEnvConstants.WORKER_JOB_ID, JOB_ID, - WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(JOB_ATTEMPT), - EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(FEATURE_FLAGS.useStreamCapableState()), - EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(FEATURE_FLAGS.autoDetectSchema()), - EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(FEATURE_FLAGS.applyFieldSelection()), - EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, FEATURE_FLAGS.fieldSelectionWorkspaces(), - EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES, FEATURE_FLAGS.strictComparisonNormalizationWorkspaces(), - EnvConfigs.SOCAT_KUBE_CPU_REQUEST, CONFIGS.getSocatSidecarKubeCpuRequest(), - EnvConfigs.SOCAT_KUBE_CPU_LIMIT, CONFIGS.getSocatSidecarKubeCpuLimit(), - EnvConfigs.LAUNCHDARKLY_KEY, CONFIGS.getLaunchDarklyKey()); + private static final Map JOB_METADATA = Maps.newHashMap( + ImmutableMap.builder() + .put(WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, FAKE_IMAGE) + .put(WorkerEnvConstants.WORKER_JOB_ID, JOB_ID) + .put(WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(JOB_ATTEMPT)) + .put(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(FEATURE_FLAGS.useStreamCapableState())) + .put(EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(FEATURE_FLAGS.autoDetectSchema())) + .put(EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(FEATURE_FLAGS.applyFieldSelection())) + .put(EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, FEATURE_FLAGS.fieldSelectionWorkspaces()) + .put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES, FEATURE_FLAGS.strictComparisonNormalizationWorkspaces()) + .put(EnvConfigs.SOCAT_KUBE_CPU_REQUEST, CONFIGS.getSocatSidecarKubeCpuRequest()) + .put(EnvConfigs.SOCAT_KUBE_CPU_LIMIT, CONFIGS.getSocatSidecarKubeCpuLimit()) + .put(EnvConfigs.LAUNCHDARKLY_KEY, CONFIGS.getLaunchDarklyKey()) + .build() + ); private WorkerConfigs workerConfigs; @Mock diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java index 363e79c5d313..30c7bdcbc24b 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java @@ -23,6 +23,7 @@ public class EnvVariableFeatureFlags implements FeatureFlags { public static final String FIELD_SELECTION_WORKSPACES = "FIELD_SELECTION_WORKSPACES"; public static final String STRICT_COMPARISON_NORMALIZATION_WORKSPACES = "STRICT_COMPARISON_NORMALIZATION_WORKSPACES"; + public static final String STRICT_COMPARISON_NORMALIZATION_TAG = "STRICT_COMPARISON_NORMALIZATION_TAG"; @Override public boolean autoDisablesFailingConnections() { @@ -71,6 +72,11 @@ public String strictComparisonNormalizationWorkspaces() { return getEnvOrDefault(STRICT_COMPARISON_NORMALIZATION_WORKSPACES, "", (arg) -> arg); } + @Override + public String strictComparisonNormalizationTag() { + return getEnvOrDefault(STRICT_COMPARISON_NORMALIZATION_TAG, "strict_comparison", (arg) -> arg); + } + // TODO: refactor in order to use the same method than the ones in EnvConfigs.java public T getEnvOrDefault(final String key, final T defaultValue, final Function parser) { final String value = System.getenv(key); diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java index 98ef0a601f44..eb7ab4cf48cf 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java @@ -46,4 +46,9 @@ public interface FeatureFlags { */ String strictComparisonNormalizationWorkspaces(); + /** + * @return The Docker image tag representing the normalization version with strict-comparison + */ + String strictComparisonNormalizationTag(); + } diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java index 8d238629af85..53f1f5db2c62 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java @@ -753,6 +753,8 @@ public interface Configs { String getStrictComparisonNormalizationWorkspaces(); + String getStrictComparisonNormalizationTag(); + enum TrackingStrategy { SEGMENT, LOGGING diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java index 7de8e3c39942..e35297d03282 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -225,6 +225,7 @@ public class EnvConfigs implements Configs { private static final String FIELD_SELECTION_WORKSPACES = "FIELD_SELECTION_WORKSPACES"; private static final String STRICT_COMPARISON_NORMALIZATION_WORKSPACES = "STRICT_COMPARISON_NORMALIZATION_WORKSPACES"; + private static final String STRICT_COMPARISON_NORMALIZATION_TAG = "STRICT_COMPARISON_NORMALIZATION_TAG"; public static final Map> JOB_SHARED_ENVS = Map.of( AIRBYTE_VERSION, (instance) -> instance.getAirbyteVersion().serialize(), @@ -1152,6 +1153,11 @@ public String getStrictComparisonNormalizationWorkspaces() { return getEnvOrDefault(STRICT_COMPARISON_NORMALIZATION_WORKSPACES, ""); } + @Override + public String getStrictComparisonNormalizationTag() { + return getEnvOrDefault(STRICT_COMPARISON_NORMALIZATION_TAG, "strict_comparison"); + } + @Override public int getActivityNumberOfAttempt() { return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_ATTEMPT, "5")); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java index d84375c47f67..8cadd5b8175d 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java @@ -98,6 +98,7 @@ public ContainerOrchestratorConfig kubernetesContainerOrchestratorConfig( environmentVariables.put(EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, featureFlags.fieldSelectionWorkspaces()); environmentVariables.put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES, featureFlags.strictComparisonNormalizationWorkspaces()); + environmentVariables.put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_TAG, featureFlags.strictComparisonNormalizationTag()); environmentVariables.put(JAVA_OPTS_ENV_VAR, containerOrchestratorJavaOpts); environmentVariables.put(CONTROL_PLANE_AUTH_ENDPOINT_ENV_VAR, controlPlaneAuthEndpoint); environmentVariables.put(DATA_PLANE_SERVICE_ACCOUNT_CREDENTIALS_PATH_ENV_VAR, dataPlaneServiceAccountCredentialsPath); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivity.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivity.java index 25516f09e513..5c301d8fadcd 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivity.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivity.java @@ -12,7 +12,6 @@ import io.airbyte.persistence.job.models.JobRunConfig; import io.temporal.activity.ActivityInterface; import io.temporal.activity.ActivityMethod; -import java.util.UUID; @ActivityInterface public interface NormalizationActivity { diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java index 1dc225d3c508..c6fed36d9ab4 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java @@ -74,7 +74,6 @@ public class NormalizationActivityImpl implements NormalizationActivity { // This constant is not currently in use. We'll need to bump it when we try releasing v1 again. private static final Version MINIMAL_VERSION_FOR_DATATYPES_V1 = new Version("0.3.0"); private static final String V1_NORMALIZATION_MINOR_VERSION = "3"; - private static final String STRICT_COMPARISON_IMAGE_TAG = "strict_comparison"; private static final String NON_STRICT_COMPARISON_IMAGE_TAG = "0.2.25"; public NormalizationActivityImpl(@Named("containerOrchestratorConfig") final Optional containerOrchestratorConfig, @@ -121,7 +120,7 @@ public NormalizationSummary normalize(final JobRunConfig jobRunConfig, final var fullInput = Jsons.clone(input).withDestinationConfiguration(fullDestinationConfig); if (FeatureFlagHelper.isStrictComparisonNormalizationEnabledForWorkspace(featureFlags, input.getWorkspaceId())) { - activateStrictNormalizationComparisonIfPossible(destinationLauncherConfig); + activateStrictNormalizationComparisonIfPossible(destinationLauncherConfig, featureFlags); } // Check the version of normalization @@ -170,11 +169,11 @@ public NormalizationSummary normalize(final JobRunConfig jobRunConfig, } @VisibleForTesting - static void activateStrictNormalizationComparisonIfPossible(final IntegrationLauncherConfig destinationLauncherConfig) { + static void activateStrictNormalizationComparisonIfPossible(final IntegrationLauncherConfig destinationLauncherConfig, final FeatureFlags featureFlags) { // Strict comparison was branched from normalization 0.2.25, so we shouldn't apply it if we're // trying to use a newer version of normalization if (NON_STRICT_COMPARISON_IMAGE_TAG.equals(getNormalizationImageTag(destinationLauncherConfig))) { - replaceNormalizationImageTag(destinationLauncherConfig, STRICT_COMPARISON_IMAGE_TAG); + replaceNormalizationImageTag(destinationLauncherConfig, featureFlags.strictComparisonNormalizationTag()); } } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/NormalizationActivityImplTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/NormalizationActivityImplTest.java index a167758ca457..3371a63214f9 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/NormalizationActivityImplTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/NormalizationActivityImplTest.java @@ -5,7 +5,11 @@ package io.airbyte.workers.temporal.sync; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import io.airbyte.commons.features.FeatureFlags; +import io.airbyte.commons.version.Version; import io.airbyte.persistence.job.models.IntegrationLauncherConfig; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -17,19 +21,22 @@ void checkNormalizationDataTypesSupportFromVersionString() { Assertions.assertFalse(NormalizationActivityImpl.normalizationSupportsV1DataTypes(withNormalizationVersion("0.2.5"))); Assertions.assertFalse(NormalizationActivityImpl.normalizationSupportsV1DataTypes(withNormalizationVersion("0.1.1"))); Assertions.assertTrue(NormalizationActivityImpl.normalizationSupportsV1DataTypes(withNormalizationVersion("0.3.0"))); - Assertions.assertTrue(NormalizationActivityImpl.normalizationSupportsV1DataTypes(withNormalizationVersion("0.4.1"))); - Assertions.assertTrue(NormalizationActivityImpl.normalizationSupportsV1DataTypes(withNormalizationVersion("dev"))); + Assertions.assertFalse(NormalizationActivityImpl.normalizationSupportsV1DataTypes(withNormalizationVersion("0.4.1"))); + Assertions.assertFalse(NormalizationActivityImpl.normalizationSupportsV1DataTypes(withNormalizationVersion("dev"))); Assertions.assertFalse(NormalizationActivityImpl.normalizationSupportsV1DataTypes(withNormalizationVersion("protocolv1"))); } @Test void checkNormalizationTagReplacement() { + final FeatureFlags featureFlags = mock(FeatureFlags.class); + when(featureFlags.strictComparisonNormalizationTag()).thenReturn("strict_comparison"); + final IntegrationLauncherConfig config1 = withNormalizationVersion("0.2.25"); - NormalizationActivityImpl.activateStrictNormalizationComparisonIfPossible(config1); - assertEquals("normalization:0.4.0", config1.getNormalizationDockerImage()); + NormalizationActivityImpl.activateStrictNormalizationComparisonIfPossible(config1, featureFlags); + assertEquals("normalization:strict_comparison", config1.getNormalizationDockerImage()); final IntegrationLauncherConfig config2 = withNormalizationVersion("0.2.26"); - NormalizationActivityImpl.activateStrictNormalizationComparisonIfPossible(config2); + NormalizationActivityImpl.activateStrictNormalizationComparisonIfPossible(config2, featureFlags); assertEquals("normalization:0.2.26", config2.getNormalizationDockerImage()); } diff --git a/docker-compose.yaml b/docker-compose.yaml index 2e8e6efc6b85..a8baa3c3ecbf 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -110,6 +110,7 @@ services: - APPLY_FIELD_SELECTION=${APPLY_FIELD_SELECTION} - FIELD_SELECTION_WORKSPACES=${FIELD_SELECTION_WORKSPACES} - STRICT_COMPARISON_NORMALIZATION_WORKSPACES=${STRICT_COMPARISON_NORMALIZATION_WORKSPACES} + - STRICT_COMPARISON_NORMALIZATION_TAG=${STRICT_COMPARISON_NORMALIZATION_TAG} configs: - flags volumes: From beb26bf9c9ec12bbb2006876349b1777faf23a51 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 6 Feb 2023 16:17:02 -0800 Subject: [PATCH 12/23] also flag old version --- .../commons/temporal/sync/OrchestratorConstants.java | 1 + .../airbyte/workers/process/AirbyteIntegrationLauncher.java | 1 + .../workers/process/AirbyteIntegrationLauncherTest.java | 2 ++ .../airbyte/commons/features/EnvVariableFeatureFlags.java | 6 ++++++ .../main/java/io/airbyte/commons/features/FeatureFlags.java | 6 ++++++ .../src/main/java/io/airbyte/config/Configs.java | 2 ++ .../src/main/java/io/airbyte/config/EnvConfigs.java | 6 ++++++ .../config/ContainerOrchestratorConfigBeanFactory.java | 1 + .../workers/temporal/sync/NormalizationActivityImpl.java | 3 +-- 9 files changed, 26 insertions(+), 2 deletions(-) diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java index 6741b2b79748..bae8c5df92e9 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java @@ -75,6 +75,7 @@ public class OrchestratorConstants { EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES, EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_TAG, + EnvVariableFeatureFlags.NON_STRICT_COMPARISON_NORMALIZATION_TAG, FEATURE_FLAG_CLIENT, FEATURE_FLAG_PATH, EnvConfigs.LAUNCHDARKLY_KEY, diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java index f200f416e95c..106beb3dac1b 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java @@ -235,6 +235,7 @@ private Map getWorkerMetadata() { .put(EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, featureFlags.fieldSelectionWorkspaces()) .put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES, featureFlags.strictComparisonNormalizationWorkspaces()) .put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_TAG, featureFlags.strictComparisonNormalizationWorkspaces()) + .put(EnvVariableFeatureFlags.NON_STRICT_COMPARISON_NORMALIZATION_TAG, featureFlags.strictComparisonNormalizationWorkspaces()) .put(EnvConfigs.SOCAT_KUBE_CPU_LIMIT, configs.getSocatSidecarKubeCpuLimit()) .put(EnvConfigs.SOCAT_KUBE_CPU_REQUEST, configs.getSocatSidecarKubeCpuRequest()) .put(EnvConfigs.LAUNCHDARKLY_KEY, configs.getLaunchDarklyKey()) diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java index feb2fbabe2dd..bbfff7e63834 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java @@ -66,6 +66,8 @@ class AirbyteIntegrationLauncherTest { .put(EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(FEATURE_FLAGS.applyFieldSelection())) .put(EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, FEATURE_FLAGS.fieldSelectionWorkspaces()) .put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES, FEATURE_FLAGS.strictComparisonNormalizationWorkspaces()) + .put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_TAG, FEATURE_FLAGS.strictComparisonNormalizationTag()) + .put(EnvVariableFeatureFlags.NON_STRICT_COMPARISON_NORMALIZATION_TAG, FEATURE_FLAGS.nonStrictComparisonNormalizationTag()) .put(EnvConfigs.SOCAT_KUBE_CPU_REQUEST, CONFIGS.getSocatSidecarKubeCpuRequest()) .put(EnvConfigs.SOCAT_KUBE_CPU_LIMIT, CONFIGS.getSocatSidecarKubeCpuLimit()) .put(EnvConfigs.LAUNCHDARKLY_KEY, CONFIGS.getLaunchDarklyKey()) diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java index 30c7bdcbc24b..23bf91235a30 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java @@ -24,6 +24,7 @@ public class EnvVariableFeatureFlags implements FeatureFlags { public static final String STRICT_COMPARISON_NORMALIZATION_WORKSPACES = "STRICT_COMPARISON_NORMALIZATION_WORKSPACES"; public static final String STRICT_COMPARISON_NORMALIZATION_TAG = "STRICT_COMPARISON_NORMALIZATION_TAG"; + public static final String NON_STRICT_COMPARISON_NORMALIZATION_TAG = "NON_STRICT_COMPARISON_NORMALIZATION_TAG"; @Override public boolean autoDisablesFailingConnections() { @@ -77,6 +78,11 @@ public String strictComparisonNormalizationTag() { return getEnvOrDefault(STRICT_COMPARISON_NORMALIZATION_TAG, "strict_comparison", (arg) -> arg); } + @Override + public String nonStrictComparisonNormalizationTag() { + return getEnvOrDefault(NON_STRICT_COMPARISON_NORMALIZATION_TAG, "0.2.25", (arg) -> arg); + } + // TODO: refactor in order to use the same method than the ones in EnvConfigs.java public T getEnvOrDefault(final String key, final T defaultValue, final Function parser) { final String value = System.getenv(key); diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java index eb7ab4cf48cf..899568d802c0 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java @@ -51,4 +51,10 @@ public interface FeatureFlags { */ String strictComparisonNormalizationTag(); + /** + * @return The Docker image tag representing the normalization version which {@link #strictComparisonNormalizationTag()} + * was branched from + */ + String nonStrictComparisonNormalizationTag(); + } diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java index 53f1f5db2c62..df0214b1ab53 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java @@ -754,6 +754,8 @@ public interface Configs { String getStrictComparisonNormalizationWorkspaces(); String getStrictComparisonNormalizationTag(); + String getNonStrictComparisonNormalizationTag(); + enum TrackingStrategy { SEGMENT, diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java index e35297d03282..da56b825980c 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -226,6 +226,7 @@ public class EnvConfigs implements Configs { private static final String STRICT_COMPARISON_NORMALIZATION_WORKSPACES = "STRICT_COMPARISON_NORMALIZATION_WORKSPACES"; private static final String STRICT_COMPARISON_NORMALIZATION_TAG = "STRICT_COMPARISON_NORMALIZATION_TAG"; + private static final String NON_STRICT_COMPARISON_NORMALIZATION_TAG = "NON_STRICT_COMPARISON_NORMALIZATION_TAG"; public static final Map> JOB_SHARED_ENVS = Map.of( AIRBYTE_VERSION, (instance) -> instance.getAirbyteVersion().serialize(), @@ -1158,6 +1159,11 @@ public String getStrictComparisonNormalizationTag() { return getEnvOrDefault(STRICT_COMPARISON_NORMALIZATION_TAG, "strict_comparison"); } + @Override + public String getNonStrictComparisonNormalizationTag() { + return getEnvOrDefault(NON_STRICT_COMPARISON_NORMALIZATION_TAG, "0.2.25"); + } + @Override public int getActivityNumberOfAttempt() { return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_ATTEMPT, "5")); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java index 8cadd5b8175d..f15c02c9563f 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java @@ -99,6 +99,7 @@ public ContainerOrchestratorConfig kubernetesContainerOrchestratorConfig( environmentVariables.put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES, featureFlags.strictComparisonNormalizationWorkspaces()); environmentVariables.put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_TAG, featureFlags.strictComparisonNormalizationTag()); + environmentVariables.put(EnvVariableFeatureFlags.NON_STRICT_COMPARISON_NORMALIZATION_TAG, featureFlags.nonStrictComparisonNormalizationTag()); environmentVariables.put(JAVA_OPTS_ENV_VAR, containerOrchestratorJavaOpts); environmentVariables.put(CONTROL_PLANE_AUTH_ENDPOINT_ENV_VAR, controlPlaneAuthEndpoint); environmentVariables.put(DATA_PLANE_SERVICE_ACCOUNT_CREDENTIALS_PATH_ENV_VAR, dataPlaneServiceAccountCredentialsPath); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java index c6fed36d9ab4..525b01919eee 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java @@ -74,7 +74,6 @@ public class NormalizationActivityImpl implements NormalizationActivity { // This constant is not currently in use. We'll need to bump it when we try releasing v1 again. private static final Version MINIMAL_VERSION_FOR_DATATYPES_V1 = new Version("0.3.0"); private static final String V1_NORMALIZATION_MINOR_VERSION = "3"; - private static final String NON_STRICT_COMPARISON_IMAGE_TAG = "0.2.25"; public NormalizationActivityImpl(@Named("containerOrchestratorConfig") final Optional containerOrchestratorConfig, @Named("defaultWorkerConfigs") final WorkerConfigs workerConfigs, @@ -172,7 +171,7 @@ public NormalizationSummary normalize(final JobRunConfig jobRunConfig, static void activateStrictNormalizationComparisonIfPossible(final IntegrationLauncherConfig destinationLauncherConfig, final FeatureFlags featureFlags) { // Strict comparison was branched from normalization 0.2.25, so we shouldn't apply it if we're // trying to use a newer version of normalization - if (NON_STRICT_COMPARISON_IMAGE_TAG.equals(getNormalizationImageTag(destinationLauncherConfig))) { + if (featureFlags.nonStrictComparisonNormalizationTag().equals(getNormalizationImageTag(destinationLauncherConfig))) { replaceNormalizationImageTag(destinationLauncherConfig, featureFlags.strictComparisonNormalizationTag()); } } From c086301c7339b5fe80283e07c309100c341ab9c5 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 6 Feb 2023 16:20:43 -0800 Subject: [PATCH 13/23] add test --- .../workers/temporal/sync/NormalizationActivityImplTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/NormalizationActivityImplTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/NormalizationActivityImplTest.java index 3371a63214f9..58d9f19c621f 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/NormalizationActivityImplTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/NormalizationActivityImplTest.java @@ -24,6 +24,7 @@ void checkNormalizationDataTypesSupportFromVersionString() { Assertions.assertFalse(NormalizationActivityImpl.normalizationSupportsV1DataTypes(withNormalizationVersion("0.4.1"))); Assertions.assertFalse(NormalizationActivityImpl.normalizationSupportsV1DataTypes(withNormalizationVersion("dev"))); Assertions.assertFalse(NormalizationActivityImpl.normalizationSupportsV1DataTypes(withNormalizationVersion("protocolv1"))); + Assertions.assertFalse(NormalizationActivityImpl.normalizationSupportsV1DataTypes(withNormalizationVersion("strict_comparison"))); } @Test From cf3ceff4359dc9f1e68676bc846a61402dbf4e76 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 6 Feb 2023 16:22:57 -0800 Subject: [PATCH 14/23] missed part of the commit --- docker-compose.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/docker-compose.yaml b/docker-compose.yaml index a8baa3c3ecbf..d22d12d1e4b2 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -111,6 +111,7 @@ services: - FIELD_SELECTION_WORKSPACES=${FIELD_SELECTION_WORKSPACES} - STRICT_COMPARISON_NORMALIZATION_WORKSPACES=${STRICT_COMPARISON_NORMALIZATION_WORKSPACES} - STRICT_COMPARISON_NORMALIZATION_TAG=${STRICT_COMPARISON_NORMALIZATION_TAG} + - NON_STRICT_COMPARISON_NORMALIZATION_TAG=${NON_STRICT_COMPARISON_NORMALIZATION_TAG} configs: - flags volumes: From 141372cfbb10e5f9003354fac535be74fa701792 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 6 Feb 2023 16:24:07 -0800 Subject: [PATCH 15/23] format --- .../airbyte/workers/process/AirbyteIntegrationLauncher.java | 3 ++- .../main/java/io/airbyte/commons/features/FeatureFlags.java | 4 ++-- .../src/main/java/io/airbyte/config/Configs.java | 2 +- .../workers/temporal/sync/NormalizationActivityImpl.java | 3 ++- .../workers/temporal/sync/NormalizationActivityImplTest.java | 1 - 5 files changed, 7 insertions(+), 6 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java index 106beb3dac1b..ee9e227ad02b 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java @@ -223,7 +223,8 @@ public Process write(final Path jobRoot, private Map getWorkerMetadata() { final Configs configs = new EnvConfigs(); - // We've managed to exceed the maximum number of parameters for Map.of(), so use a builder + convert back to hashmap + // We've managed to exceed the maximum number of parameters for Map.of(), so use a builder + convert + // back to hashmap return Maps.newHashMap( ImmutableMap.builder() .put(WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, imageName) diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java index 899568d802c0..14e16a83ff88 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java @@ -52,8 +52,8 @@ public interface FeatureFlags { String strictComparisonNormalizationTag(); /** - * @return The Docker image tag representing the normalization version which {@link #strictComparisonNormalizationTag()} - * was branched from + * @return The Docker image tag representing the normalization version which + * {@link #strictComparisonNormalizationTag()} was branched from */ String nonStrictComparisonNormalizationTag(); diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java index df0214b1ab53..0e79c5cb52cc 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java @@ -754,8 +754,8 @@ public interface Configs { String getStrictComparisonNormalizationWorkspaces(); String getStrictComparisonNormalizationTag(); - String getNonStrictComparisonNormalizationTag(); + String getNonStrictComparisonNormalizationTag(); enum TrackingStrategy { SEGMENT, diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java index 525b01919eee..67dfa1e6001c 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java @@ -168,7 +168,8 @@ public NormalizationSummary normalize(final JobRunConfig jobRunConfig, } @VisibleForTesting - static void activateStrictNormalizationComparisonIfPossible(final IntegrationLauncherConfig destinationLauncherConfig, final FeatureFlags featureFlags) { + static void activateStrictNormalizationComparisonIfPossible(final IntegrationLauncherConfig destinationLauncherConfig, + final FeatureFlags featureFlags) { // Strict comparison was branched from normalization 0.2.25, so we shouldn't apply it if we're // trying to use a newer version of normalization if (featureFlags.nonStrictComparisonNormalizationTag().equals(getNormalizationImageTag(destinationLauncherConfig))) { diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/NormalizationActivityImplTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/NormalizationActivityImplTest.java index 58d9f19c621f..291330b1b859 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/NormalizationActivityImplTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/NormalizationActivityImplTest.java @@ -9,7 +9,6 @@ import static org.mockito.Mockito.when; import io.airbyte.commons.features.FeatureFlags; -import io.airbyte.commons.version.Version; import io.airbyte.persistence.job.models.IntegrationLauncherConfig; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; From c807c7a5d5bc2037b1a393d5393ecab91460f7cc Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 6 Feb 2023 16:36:45 -0800 Subject: [PATCH 16/23] add test for null workspace ID --- .../io/airbyte/commons/features/FeatureFlagHelperTest.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/airbyte-commons/src/test/java/io/airbyte/commons/features/FeatureFlagHelperTest.java b/airbyte-commons/src/test/java/io/airbyte/commons/features/FeatureFlagHelperTest.java index 7846e1e8aea5..c42bfafc3b0b 100644 --- a/airbyte-commons/src/test/java/io/airbyte/commons/features/FeatureFlagHelperTest.java +++ b/airbyte-commons/src/test/java/io/airbyte/commons/features/FeatureFlagHelperTest.java @@ -29,6 +29,13 @@ void isFieldSelectionEnabledForWorkspaceWithEmptyString() { assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, UUID.randomUUID(), null)); } + @Test + void isFieldSelectionEnabledForNullWorkspaceWithEmptyString() { + when(featureFlags.fieldSelectionWorkspaces()).thenReturn(""); + + assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, null, null)); + } + @Test void isFieldSelectionEnabledForWorkspaceWithSpaceString() { when(featureFlags.fieldSelectionWorkspaces()).thenReturn(" "); From d8c5805a9bad948378a78e4170d811b47147e399 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 6 Feb 2023 16:53:43 -0800 Subject: [PATCH 17/23] Revert "also flag old version" This reverts commit 3be601d16cbc929678732462ae7f5c7ddfe5ef37. --- .../commons/temporal/sync/OrchestratorConstants.java | 1 - .../airbyte/workers/process/AirbyteIntegrationLauncher.java | 1 - .../workers/process/AirbyteIntegrationLauncherTest.java | 2 -- .../airbyte/commons/features/EnvVariableFeatureFlags.java | 6 ------ .../main/java/io/airbyte/commons/features/FeatureFlags.java | 6 ------ .../src/main/java/io/airbyte/config/Configs.java | 2 -- .../src/main/java/io/airbyte/config/EnvConfigs.java | 6 ------ .../config/ContainerOrchestratorConfigBeanFactory.java | 1 - .../workers/temporal/sync/NormalizationActivityImpl.java | 3 ++- 9 files changed, 2 insertions(+), 26 deletions(-) diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java index bae8c5df92e9..6741b2b79748 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java @@ -75,7 +75,6 @@ public class OrchestratorConstants { EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES, EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_TAG, - EnvVariableFeatureFlags.NON_STRICT_COMPARISON_NORMALIZATION_TAG, FEATURE_FLAG_CLIENT, FEATURE_FLAG_PATH, EnvConfigs.LAUNCHDARKLY_KEY, diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java index ee9e227ad02b..6fb3934c3dcd 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java @@ -236,7 +236,6 @@ private Map getWorkerMetadata() { .put(EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, featureFlags.fieldSelectionWorkspaces()) .put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES, featureFlags.strictComparisonNormalizationWorkspaces()) .put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_TAG, featureFlags.strictComparisonNormalizationWorkspaces()) - .put(EnvVariableFeatureFlags.NON_STRICT_COMPARISON_NORMALIZATION_TAG, featureFlags.strictComparisonNormalizationWorkspaces()) .put(EnvConfigs.SOCAT_KUBE_CPU_LIMIT, configs.getSocatSidecarKubeCpuLimit()) .put(EnvConfigs.SOCAT_KUBE_CPU_REQUEST, configs.getSocatSidecarKubeCpuRequest()) .put(EnvConfigs.LAUNCHDARKLY_KEY, configs.getLaunchDarklyKey()) diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java index bbfff7e63834..feb2fbabe2dd 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java @@ -66,8 +66,6 @@ class AirbyteIntegrationLauncherTest { .put(EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(FEATURE_FLAGS.applyFieldSelection())) .put(EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, FEATURE_FLAGS.fieldSelectionWorkspaces()) .put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES, FEATURE_FLAGS.strictComparisonNormalizationWorkspaces()) - .put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_TAG, FEATURE_FLAGS.strictComparisonNormalizationTag()) - .put(EnvVariableFeatureFlags.NON_STRICT_COMPARISON_NORMALIZATION_TAG, FEATURE_FLAGS.nonStrictComparisonNormalizationTag()) .put(EnvConfigs.SOCAT_KUBE_CPU_REQUEST, CONFIGS.getSocatSidecarKubeCpuRequest()) .put(EnvConfigs.SOCAT_KUBE_CPU_LIMIT, CONFIGS.getSocatSidecarKubeCpuLimit()) .put(EnvConfigs.LAUNCHDARKLY_KEY, CONFIGS.getLaunchDarklyKey()) diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java index 23bf91235a30..30c7bdcbc24b 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java @@ -24,7 +24,6 @@ public class EnvVariableFeatureFlags implements FeatureFlags { public static final String STRICT_COMPARISON_NORMALIZATION_WORKSPACES = "STRICT_COMPARISON_NORMALIZATION_WORKSPACES"; public static final String STRICT_COMPARISON_NORMALIZATION_TAG = "STRICT_COMPARISON_NORMALIZATION_TAG"; - public static final String NON_STRICT_COMPARISON_NORMALIZATION_TAG = "NON_STRICT_COMPARISON_NORMALIZATION_TAG"; @Override public boolean autoDisablesFailingConnections() { @@ -78,11 +77,6 @@ public String strictComparisonNormalizationTag() { return getEnvOrDefault(STRICT_COMPARISON_NORMALIZATION_TAG, "strict_comparison", (arg) -> arg); } - @Override - public String nonStrictComparisonNormalizationTag() { - return getEnvOrDefault(NON_STRICT_COMPARISON_NORMALIZATION_TAG, "0.2.25", (arg) -> arg); - } - // TODO: refactor in order to use the same method than the ones in EnvConfigs.java public T getEnvOrDefault(final String key, final T defaultValue, final Function parser) { final String value = System.getenv(key); diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java index 14e16a83ff88..eb7ab4cf48cf 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java @@ -51,10 +51,4 @@ public interface FeatureFlags { */ String strictComparisonNormalizationTag(); - /** - * @return The Docker image tag representing the normalization version which - * {@link #strictComparisonNormalizationTag()} was branched from - */ - String nonStrictComparisonNormalizationTag(); - } diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java index 0e79c5cb52cc..53f1f5db2c62 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java @@ -755,8 +755,6 @@ public interface Configs { String getStrictComparisonNormalizationTag(); - String getNonStrictComparisonNormalizationTag(); - enum TrackingStrategy { SEGMENT, LOGGING diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java index da56b825980c..e35297d03282 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -226,7 +226,6 @@ public class EnvConfigs implements Configs { private static final String STRICT_COMPARISON_NORMALIZATION_WORKSPACES = "STRICT_COMPARISON_NORMALIZATION_WORKSPACES"; private static final String STRICT_COMPARISON_NORMALIZATION_TAG = "STRICT_COMPARISON_NORMALIZATION_TAG"; - private static final String NON_STRICT_COMPARISON_NORMALIZATION_TAG = "NON_STRICT_COMPARISON_NORMALIZATION_TAG"; public static final Map> JOB_SHARED_ENVS = Map.of( AIRBYTE_VERSION, (instance) -> instance.getAirbyteVersion().serialize(), @@ -1159,11 +1158,6 @@ public String getStrictComparisonNormalizationTag() { return getEnvOrDefault(STRICT_COMPARISON_NORMALIZATION_TAG, "strict_comparison"); } - @Override - public String getNonStrictComparisonNormalizationTag() { - return getEnvOrDefault(NON_STRICT_COMPARISON_NORMALIZATION_TAG, "0.2.25"); - } - @Override public int getActivityNumberOfAttempt() { return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_ATTEMPT, "5")); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java index f15c02c9563f..8cadd5b8175d 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/ContainerOrchestratorConfigBeanFactory.java @@ -99,7 +99,6 @@ public ContainerOrchestratorConfig kubernetesContainerOrchestratorConfig( environmentVariables.put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES, featureFlags.strictComparisonNormalizationWorkspaces()); environmentVariables.put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_TAG, featureFlags.strictComparisonNormalizationTag()); - environmentVariables.put(EnvVariableFeatureFlags.NON_STRICT_COMPARISON_NORMALIZATION_TAG, featureFlags.nonStrictComparisonNormalizationTag()); environmentVariables.put(JAVA_OPTS_ENV_VAR, containerOrchestratorJavaOpts); environmentVariables.put(CONTROL_PLANE_AUTH_ENDPOINT_ENV_VAR, controlPlaneAuthEndpoint); environmentVariables.put(DATA_PLANE_SERVICE_ACCOUNT_CREDENTIALS_PATH_ENV_VAR, dataPlaneServiceAccountCredentialsPath); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java index 67dfa1e6001c..9528360f9e6f 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java @@ -74,6 +74,7 @@ public class NormalizationActivityImpl implements NormalizationActivity { // This constant is not currently in use. We'll need to bump it when we try releasing v1 again. private static final Version MINIMAL_VERSION_FOR_DATATYPES_V1 = new Version("0.3.0"); private static final String V1_NORMALIZATION_MINOR_VERSION = "3"; + private static final String NON_STRICT_COMPARISON_IMAGE_TAG = "0.2.25"; public NormalizationActivityImpl(@Named("containerOrchestratorConfig") final Optional containerOrchestratorConfig, @Named("defaultWorkerConfigs") final WorkerConfigs workerConfigs, @@ -172,7 +173,7 @@ static void activateStrictNormalizationComparisonIfPossible(final IntegrationLau final FeatureFlags featureFlags) { // Strict comparison was branched from normalization 0.2.25, so we shouldn't apply it if we're // trying to use a newer version of normalization - if (featureFlags.nonStrictComparisonNormalizationTag().equals(getNormalizationImageTag(destinationLauncherConfig))) { + if (NON_STRICT_COMPARISON_IMAGE_TAG.equals(getNormalizationImageTag(destinationLauncherConfig))) { replaceNormalizationImageTag(destinationLauncherConfig, featureFlags.strictComparisonNormalizationTag()); } } From ddb8ece5981b35ab413cee814669d1806eb6b561 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 6 Feb 2023 16:53:50 -0800 Subject: [PATCH 18/23] Revert "missed part of the commit" This reverts commit 47a67b4631cb359e3b4caa0bd75e7848fae9c665. --- docker-compose.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/docker-compose.yaml b/docker-compose.yaml index d22d12d1e4b2..a8baa3c3ecbf 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -111,7 +111,6 @@ services: - FIELD_SELECTION_WORKSPACES=${FIELD_SELECTION_WORKSPACES} - STRICT_COMPARISON_NORMALIZATION_WORKSPACES=${STRICT_COMPARISON_NORMALIZATION_WORKSPACES} - STRICT_COMPARISON_NORMALIZATION_TAG=${STRICT_COMPARISON_NORMALIZATION_TAG} - - NON_STRICT_COMPARISON_NORMALIZATION_TAG=${NON_STRICT_COMPARISON_NORMALIZATION_TAG} configs: - flags volumes: From de78b07968e4e0864ebe878226ef2e5e5d48f3f7 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 6 Feb 2023 16:57:22 -0800 Subject: [PATCH 19/23] always apply flag, even if we're behind a version --- .../temporal/sync/NormalizationActivityImpl.java | 12 +----------- .../temporal/sync/NormalizationActivityImplTest.java | 10 +++------- 2 files changed, 4 insertions(+), 18 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java index 9528360f9e6f..18373169be49 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java @@ -120,7 +120,7 @@ public NormalizationSummary normalize(final JobRunConfig jobRunConfig, final var fullInput = Jsons.clone(input).withDestinationConfiguration(fullDestinationConfig); if (FeatureFlagHelper.isStrictComparisonNormalizationEnabledForWorkspace(featureFlags, input.getWorkspaceId())) { - activateStrictNormalizationComparisonIfPossible(destinationLauncherConfig, featureFlags); + replaceNormalizationImageTag(destinationLauncherConfig, featureFlags.strictComparisonNormalizationTag()); } // Check the version of normalization @@ -168,16 +168,6 @@ public NormalizationSummary normalize(final JobRunConfig jobRunConfig, () -> context); } - @VisibleForTesting - static void activateStrictNormalizationComparisonIfPossible(final IntegrationLauncherConfig destinationLauncherConfig, - final FeatureFlags featureFlags) { - // Strict comparison was branched from normalization 0.2.25, so we shouldn't apply it if we're - // trying to use a newer version of normalization - if (NON_STRICT_COMPARISON_IMAGE_TAG.equals(getNormalizationImageTag(destinationLauncherConfig))) { - replaceNormalizationImageTag(destinationLauncherConfig, featureFlags.strictComparisonNormalizationTag()); - } - } - @Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME) @Override public NormalizationInput generateNormalizationInput(final StandardSyncInput syncInput, final StandardSyncOutput syncOutput) { diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/NormalizationActivityImplTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/NormalizationActivityImplTest.java index 291330b1b859..cc08695d8b61 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/NormalizationActivityImplTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/NormalizationActivityImplTest.java @@ -31,13 +31,9 @@ void checkNormalizationTagReplacement() { final FeatureFlags featureFlags = mock(FeatureFlags.class); when(featureFlags.strictComparisonNormalizationTag()).thenReturn("strict_comparison"); - final IntegrationLauncherConfig config1 = withNormalizationVersion("0.2.25"); - NormalizationActivityImpl.activateStrictNormalizationComparisonIfPossible(config1, featureFlags); - assertEquals("normalization:strict_comparison", config1.getNormalizationDockerImage()); - - final IntegrationLauncherConfig config2 = withNormalizationVersion("0.2.26"); - NormalizationActivityImpl.activateStrictNormalizationComparisonIfPossible(config2, featureFlags); - assertEquals("normalization:0.2.26", config2.getNormalizationDockerImage()); + final IntegrationLauncherConfig config = withNormalizationVersion("0.2.25"); + NormalizationActivityImpl.replaceNormalizationImageTag(config, "strict_comparison"); + assertEquals("normalization:strict_comparison", config.getNormalizationDockerImage()); } private IntegrationLauncherConfig withNormalizationVersion(final String version) { From 0fb728a1a3f815114a5b1be8c923fcc6b684f504 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 6 Feb 2023 19:19:02 -0800 Subject: [PATCH 20/23] derp --- .../io/airbyte/workers/process/AirbyteIntegrationLauncher.java | 2 +- .../airbyte/workers/process/AirbyteIntegrationLauncherTest.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java index 6fb3934c3dcd..2ef62547b931 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java @@ -235,7 +235,7 @@ private Map getWorkerMetadata() { .put(EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(featureFlags.applyFieldSelection())) .put(EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, featureFlags.fieldSelectionWorkspaces()) .put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES, featureFlags.strictComparisonNormalizationWorkspaces()) - .put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_TAG, featureFlags.strictComparisonNormalizationWorkspaces()) + .put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_TAG, featureFlags.strictComparisonNormalizationTag()) .put(EnvConfigs.SOCAT_KUBE_CPU_LIMIT, configs.getSocatSidecarKubeCpuLimit()) .put(EnvConfigs.SOCAT_KUBE_CPU_REQUEST, configs.getSocatSidecarKubeCpuRequest()) .put(EnvConfigs.LAUNCHDARKLY_KEY, configs.getLaunchDarklyKey()) diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java index feb2fbabe2dd..7431d0488f78 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java @@ -66,6 +66,7 @@ class AirbyteIntegrationLauncherTest { .put(EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(FEATURE_FLAGS.applyFieldSelection())) .put(EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, FEATURE_FLAGS.fieldSelectionWorkspaces()) .put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES, FEATURE_FLAGS.strictComparisonNormalizationWorkspaces()) + .put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_TAG, FEATURE_FLAGS.strictComparisonNormalizationTag()) .put(EnvConfigs.SOCAT_KUBE_CPU_REQUEST, CONFIGS.getSocatSidecarKubeCpuRequest()) .put(EnvConfigs.SOCAT_KUBE_CPU_LIMIT, CONFIGS.getSocatSidecarKubeCpuLimit()) .put(EnvConfigs.LAUNCHDARKLY_KEY, CONFIGS.getLaunchDarklyKey()) From 3891265428b912239ff9606557111861cd1df150 Mon Sep 17 00:00:00 2001 From: Jimmy Ma Date: Tue, 7 Feb 2023 10:05:20 -0800 Subject: [PATCH 21/23] Add more logging to the normalization activity --- .../workers/temporal/sync/NormalizationActivityImpl.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java index 18373169be49..4f96db8b5941 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java @@ -52,8 +52,10 @@ import java.util.Optional; import java.util.UUID; import java.util.function.Supplier; +import lombok.extern.slf4j.Slf4j; @Singleton +@Slf4j public class NormalizationActivityImpl implements NormalizationActivity { private final Optional containerOrchestratorConfig; @@ -120,6 +122,7 @@ public NormalizationSummary normalize(final JobRunConfig jobRunConfig, final var fullInput = Jsons.clone(input).withDestinationConfiguration(fullDestinationConfig); if (FeatureFlagHelper.isStrictComparisonNormalizationEnabledForWorkspace(featureFlags, input.getWorkspaceId())) { + log.info("Using strict comparison normalization"); replaceNormalizationImageTag(destinationLauncherConfig, featureFlags.strictComparisonNormalizationTag()); } @@ -128,6 +131,7 @@ public NormalizationSummary normalize(final JobRunConfig jobRunConfig, // all columns being typed as JSONB. If normalization is using an older version, fallback to using // v0 data types. if (!normalizationSupportsV1DataTypes(destinationLauncherConfig)) { + log.info("Using protocol v0"); CatalogMigrationV1Helper.downgradeSchemaIfNeeded(fullInput.getCatalog()); } else { @@ -136,6 +140,7 @@ public NormalizationSummary normalize(final JobRunConfig jobRunConfig, // phase v0 out. // Performance impact should be low considering the nature of the check compared to the time to run // normalization. + log.info("Using protocol v1"); CatalogMigrationV1Helper.upgradeSchemaIfNeeded(fullInput.getCatalog()); } @@ -146,6 +151,7 @@ public NormalizationSummary normalize(final JobRunConfig jobRunConfig, final CheckedSupplier, Exception> workerFactory; + log.info("Using normalization: " + destinationLauncherConfig.getNormalizationDockerImage()); if (containerOrchestratorConfig.isPresent()) { workerFactory = getContainerLauncherWorkerFactory(workerConfigs, destinationLauncherConfig, jobRunConfig, () -> context); From 6e9534996e458f5f36c4190390fa797623173bb0 Mon Sep 17 00:00:00 2001 From: Jimmy Ma Date: Tue, 7 Feb 2023 10:05:55 -0800 Subject: [PATCH 22/23] Update charts and kustomize for the feature flag --- charts/airbyte-worker/templates/deployment.yaml | 10 ++++++++++ kube/resources/worker.yaml | 10 ++++++++++ 2 files changed, 20 insertions(+) diff --git a/charts/airbyte-worker/templates/deployment.yaml b/charts/airbyte-worker/templates/deployment.yaml index af9517b02f69..e37bf9af02e3 100644 --- a/charts/airbyte-worker/templates/deployment.yaml +++ b/charts/airbyte-worker/templates/deployment.yaml @@ -310,6 +310,16 @@ spec: configMapKeyRef: name: {{ .Release.Name }}-airbyte-env key: USE_STREAM_CAPABLE_STATE + - name: STRICT_COMPARISON_NORMALIZATION_TAG + valueFrom: + configMapKeyRef: + name: {{ .Release.Name }}-airbyte-env + key: STRICT_COMPARISON_NORMALIZATION_TAG + - name: STRICT_COMPARISON_NORMALIZATION_WORKSPACES + valueFrom: + configMapKeyRef: + name: {{ .Release.Name }}-airbyte-env + key: STRICT_COMPARISON_NORMALIZATION_WORKSPACES - name: SHOULD_RUN_NOTIFY_WORKFLOWS valueFrom: configMapKeyRef: diff --git a/kube/resources/worker.yaml b/kube/resources/worker.yaml index d8f7be704011..38cf19fe268d 100644 --- a/kube/resources/worker.yaml +++ b/kube/resources/worker.yaml @@ -280,6 +280,16 @@ spec: configMapKeyRef: name: airbyte-env key: USE_STREAM_CAPABLE_STATE + - name: STRICT_COMPARISON_NORMALIZATION_TAG + valueFrom: + configMapKeyRef: + name: airbyte-env + key: STRICT_COMPARISON_NORMALIZATION_TAG + - name: STRICT_COMPARISON_NORMALIZATION_WORKSPACES + valueFrom: + configMapKeyRef: + name: airbyte-env + key: STRICT_COMPARISON_NORMALIZATION_WORKSPACES - name: SHOULD_RUN_NOTIFY_WORKFLOWS valueFrom: configMapKeyRef: From 524308bfb6a301315eea934eff11295fe390b0af Mon Sep 17 00:00:00 2001 From: Jimmy Ma Date: Tue, 7 Feb 2023 11:08:28 -0800 Subject: [PATCH 23/23] Format --- .../workers/process/AirbyteIntegrationLauncherTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java index 7431d0488f78..a50530849bb8 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java @@ -70,8 +70,7 @@ class AirbyteIntegrationLauncherTest { .put(EnvConfigs.SOCAT_KUBE_CPU_REQUEST, CONFIGS.getSocatSidecarKubeCpuRequest()) .put(EnvConfigs.SOCAT_KUBE_CPU_LIMIT, CONFIGS.getSocatSidecarKubeCpuLimit()) .put(EnvConfigs.LAUNCHDARKLY_KEY, CONFIGS.getLaunchDarklyKey()) - .build() - ); + .build()); private WorkerConfigs workerConfigs; @Mock