Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add normalization strict incremental feature flag #22514

Merged
merged 25 commits into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public class OrchestratorConstants {
EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA,
EnvVariableFeatureFlags.APPLY_FIELD_SELECTION,
EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES,
EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES,
EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_TAG,
FEATURE_FLAG_CLIENT,
FEATURE_FLAG_PATH,
EnvConfigs.LAUNCHDARKLY_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import datadog.trace.api.Trace;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlags;
Expand Down Expand Up @@ -222,17 +223,23 @@ public Process write(final Path jobRoot,

private Map<String, String> getWorkerMetadata() {
final Configs configs = new EnvConfigs();
return Map.of(
WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, imageName,
WorkerEnvConstants.WORKER_JOB_ID, jobId,
WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(attempt),
EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(featureFlags.useStreamCapableState()),
EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(featureFlags.autoDetectSchema()),
EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(featureFlags.applyFieldSelection()),
EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, featureFlags.fieldSelectionWorkspaces(),
EnvConfigs.SOCAT_KUBE_CPU_LIMIT, configs.getSocatSidecarKubeCpuLimit(),
EnvConfigs.SOCAT_KUBE_CPU_REQUEST, configs.getSocatSidecarKubeCpuRequest(),
EnvConfigs.LAUNCHDARKLY_KEY, configs.getLaunchDarklyKey());
// We've managed to exceed the maximum number of parameters for Map.of(), so use a builder + convert
// back to hashmap
return Maps.newHashMap(
ImmutableMap.<String, String>builder()
.put(WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, imageName)
.put(WorkerEnvConstants.WORKER_JOB_ID, jobId)
.put(WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(attempt))
.put(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(featureFlags.useStreamCapableState()))
.put(EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(featureFlags.autoDetectSchema()))
.put(EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(featureFlags.applyFieldSelection()))
.put(EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, featureFlags.fieldSelectionWorkspaces())
.put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES, featureFlags.strictComparisonNormalizationWorkspaces())
.put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_TAG, featureFlags.strictComparisonNormalizationTag())
.put(EnvConfigs.SOCAT_KUBE_CPU_LIMIT, configs.getSocatSidecarKubeCpuLimit())
.put(EnvConfigs.SOCAT_KUBE_CPU_REQUEST, configs.getSocatSidecarKubeCpuRequest())
.put(EnvConfigs.LAUNCHDARKLY_KEY, configs.getLaunchDarklyKey())
.build());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.config.Configs;
Expand Down Expand Up @@ -55,17 +56,21 @@ class AirbyteIntegrationLauncherTest {
private static final FeatureFlags FEATURE_FLAGS = new EnvVariableFeatureFlags();
private static final Configs CONFIGS = new EnvConfigs();

private static final Map<String, String> JOB_METADATA = Map.of(
WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, FAKE_IMAGE,
WorkerEnvConstants.WORKER_JOB_ID, JOB_ID,
WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(JOB_ATTEMPT),
EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(FEATURE_FLAGS.useStreamCapableState()),
EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(FEATURE_FLAGS.autoDetectSchema()),
EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(FEATURE_FLAGS.applyFieldSelection()),
EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, FEATURE_FLAGS.fieldSelectionWorkspaces(),
EnvConfigs.SOCAT_KUBE_CPU_REQUEST, CONFIGS.getSocatSidecarKubeCpuRequest(),
EnvConfigs.SOCAT_KUBE_CPU_LIMIT, CONFIGS.getSocatSidecarKubeCpuLimit(),
EnvConfigs.LAUNCHDARKLY_KEY, CONFIGS.getLaunchDarklyKey());
private static final Map<String, String> JOB_METADATA = Maps.newHashMap(
ImmutableMap.<String, String>builder()
.put(WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, FAKE_IMAGE)
.put(WorkerEnvConstants.WORKER_JOB_ID, JOB_ID)
.put(WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(JOB_ATTEMPT))
.put(EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(FEATURE_FLAGS.useStreamCapableState()))
.put(EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(FEATURE_FLAGS.autoDetectSchema()))
.put(EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(FEATURE_FLAGS.applyFieldSelection()))
.put(EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, FEATURE_FLAGS.fieldSelectionWorkspaces())
.put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES, FEATURE_FLAGS.strictComparisonNormalizationWorkspaces())
.put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_TAG, FEATURE_FLAGS.strictComparisonNormalizationTag())
.put(EnvConfigs.SOCAT_KUBE_CPU_REQUEST, CONFIGS.getSocatSidecarKubeCpuRequest())
.put(EnvConfigs.SOCAT_KUBE_CPU_LIMIT, CONFIGS.getSocatSidecarKubeCpuLimit())
.put(EnvConfigs.LAUNCHDARKLY_KEY, CONFIGS.getLaunchDarklyKey())
.build());

private WorkerConfigs workerConfigs;
@Mock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ public class EnvVariableFeatureFlags implements FeatureFlags {

public static final String FIELD_SELECTION_WORKSPACES = "FIELD_SELECTION_WORKSPACES";

public static final String STRICT_COMPARISON_NORMALIZATION_WORKSPACES = "STRICT_COMPARISON_NORMALIZATION_WORKSPACES";
public static final String STRICT_COMPARISON_NORMALIZATION_TAG = "STRICT_COMPARISON_NORMALIZATION_TAG";

@Override
public boolean autoDisablesFailingConnections() {
log.info("Auto Disable Failing Connections: " + Boolean.parseBoolean(System.getenv("AUTO_DISABLE_FAILING_CONNECTIONS")));
Expand Down Expand Up @@ -64,6 +67,16 @@ public String fieldSelectionWorkspaces() {
return getEnvOrDefault(FIELD_SELECTION_WORKSPACES, "", (arg) -> arg);
}

@Override
public String strictComparisonNormalizationWorkspaces() {
return getEnvOrDefault(STRICT_COMPARISON_NORMALIZATION_WORKSPACES, "", (arg) -> arg);
}

@Override
public String strictComparisonNormalizationTag() {
return getEnvOrDefault(STRICT_COMPARISON_NORMALIZATION_TAG, "strict_comparison", (arg) -> arg);
}

// TODO: refactor in order to use the same method than the ones in EnvConfigs.java
public <T> T getEnvOrDefault(final String key, final T defaultValue, final Function<String, T> parser) {
final String value = System.getenv(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,43 @@

package io.airbyte.commons.features;

import com.google.common.annotations.VisibleForTesting;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class FeatureFlagHelper {

public static boolean isFieldSelectionEnabledForWorkspace(final FeatureFlags featureFlags, final UUID workspaceId) {
final String workspaceIdsString = featureFlags.fieldSelectionWorkspaces();
return isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, workspaceId, "field selection")
|| featureFlags.applyFieldSelection();
}

public static boolean isStrictComparisonNormalizationEnabledForWorkspace(final FeatureFlags featureFlags, final UUID workspaceId) {
return isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::strictComparisonNormalizationWorkspaces, workspaceId,
"strict comparison in normalization");
}

@VisibleForTesting
static boolean isWorkspaceIncludedInFlag(final FeatureFlags featureFlags,
final Function<FeatureFlags, String> flagRetriever,
final UUID workspaceId,
final String context) {
final String workspaceIdsString = flagRetriever.apply(featureFlags);
final Set<UUID> workspaceIds = new HashSet<>();
if (workspaceIdsString != null && !workspaceIdsString.isEmpty()) {
for (final String id : workspaceIdsString.split(",")) {
try {
workspaceIds.add(UUID.fromString(id));
} catch (final IllegalArgumentException e) {
log.warn("Malformed workspace id for field selection: {}", id);
log.warn("Malformed workspace id for {}: {}", context, id);
}
}
}
if (workspaceId != null && workspaceIds.contains(workspaceId)) {
return true;
}

return featureFlags.applyFieldSelection();
return workspaceId != null && workspaceIds.contains(workspaceId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,18 @@ public interface FeatureFlags {
*/
String fieldSelectionWorkspaces();

/**
* Get the workspaces allow-listed for strict incremental comparison in normalization. This takes
* precedence over the normalization version in destination_definitions.yaml.
*
* @return a comma-separated list of workspace ids where strict incremental comparison should be
* enabled in normalization.
*/
String strictComparisonNormalizationWorkspaces();

/**
* @return The Docker image tag representing the normalization version with strict-comparison
*/
String strictComparisonNormalizationTag();

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,40 +26,47 @@ void beforeEach() {
void isFieldSelectionEnabledForWorkspaceWithEmptyString() {
when(featureFlags.fieldSelectionWorkspaces()).thenReturn("");

assertFalse(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, UUID.randomUUID()));
assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, UUID.randomUUID(), null));
}

@Test
void isFieldSelectionEnabledForNullWorkspaceWithEmptyString() {
when(featureFlags.fieldSelectionWorkspaces()).thenReturn("");

assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, null, null));
}

@Test
void isFieldSelectionEnabledForWorkspaceWithSpaceString() {
when(featureFlags.fieldSelectionWorkspaces()).thenReturn(" ");

assertFalse(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, UUID.randomUUID()));
assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, UUID.randomUUID(), null));
}

@Test
void isFieldSelectionEnabledForWorkspaceWithNullString() {
when(featureFlags.fieldSelectionWorkspaces()).thenReturn(null);

assertFalse(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, UUID.randomUUID()));
assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, UUID.randomUUID(), null));
}

@Test
void isFieldSelectionEnabledForWorkspaceWithSomeIdsAndAMatch() {
final UUID workspaceId = UUID.randomUUID();
final UUID randomId = UUID.randomUUID();
when(featureFlags.fieldSelectionWorkspaces()).thenReturn(randomId.toString() + "," + workspaceId.toString());
when(featureFlags.fieldSelectionWorkspaces()).thenReturn(randomId + "," + workspaceId);

assertTrue(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, workspaceId));
assertTrue(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, workspaceId, null));
}

@Test
void isFieldSelectionEnabledForWorkspaceWithSomeIdsAndNoMatch() {
final UUID workspaceId = UUID.randomUUID();
final UUID randomId1 = UUID.randomUUID();
final UUID randomId2 = UUID.randomUUID();
when(featureFlags.fieldSelectionWorkspaces()).thenReturn(randomId1.toString() + "," + randomId2.toString());
when(featureFlags.fieldSelectionWorkspaces()).thenReturn(randomId1 + "," + randomId2);

assertFalse(FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, workspaceId));
assertFalse(FeatureFlagHelper.isWorkspaceIncludedInFlag(featureFlags, FeatureFlags::fieldSelectionWorkspaces, workspaceId, null));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,10 @@ public interface Configs {

String getFieldSelectionWorkspaces();

String getStrictComparisonNormalizationWorkspaces();

String getStrictComparisonNormalizationTag();

enum TrackingStrategy {
SEGMENT,
LOGGING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ public class EnvConfigs implements Configs {
private static final String APPLY_FIELD_SELECTION = "APPLY_FIELD_SELECTION";
private static final String FIELD_SELECTION_WORKSPACES = "FIELD_SELECTION_WORKSPACES";

private static final String STRICT_COMPARISON_NORMALIZATION_WORKSPACES = "STRICT_COMPARISON_NORMALIZATION_WORKSPACES";
private static final String STRICT_COMPARISON_NORMALIZATION_TAG = "STRICT_COMPARISON_NORMALIZATION_TAG";

public static final Map<String, Function<EnvConfigs, String>> JOB_SHARED_ENVS = Map.of(
AIRBYTE_VERSION, (instance) -> instance.getAirbyteVersion().serialize(),
AIRBYTE_ROLE, EnvConfigs::getAirbyteRole,
Expand Down Expand Up @@ -1145,6 +1148,16 @@ public String getFieldSelectionWorkspaces() {
return getEnvOrDefault(FIELD_SELECTION_WORKSPACES, "");
}

@Override
public String getStrictComparisonNormalizationWorkspaces() {
return getEnvOrDefault(STRICT_COMPARISON_NORMALIZATION_WORKSPACES, "");
}

@Override
public String getStrictComparisonNormalizationTag() {
return getEnvOrDefault(STRICT_COMPARISON_NORMALIZATION_TAG, "strict_comparison");
}

@Override
public int getActivityNumberOfAttempt() {
return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_ATTEMPT, "5"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,7 @@ properties:
type: object
description: optional resource requirements to run sync workers
existingJavaType: io.airbyte.config.ResourceRequirements
workspaceId:
description: The id of the workspace associated with this sync
type: string
format: uuid
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ public ContainerOrchestratorConfig kubernetesContainerOrchestratorConfig(
environmentVariables.put(EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, Boolean.toString(featureFlags.autoDetectSchema()));
environmentVariables.put(EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, Boolean.toString(featureFlags.applyFieldSelection()));
environmentVariables.put(EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, featureFlags.fieldSelectionWorkspaces());
environmentVariables.put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_WORKSPACES,
featureFlags.strictComparisonNormalizationWorkspaces());
environmentVariables.put(EnvVariableFeatureFlags.STRICT_COMPARISON_NORMALIZATION_TAG, featureFlags.strictComparisonNormalizationTag());
environmentVariables.put(JAVA_OPTS_ENV_VAR, containerOrchestratorJavaOpts);
environmentVariables.put(CONTROL_PLANE_AUTH_ENDPOINT_ENV_VAR, controlPlaneAuthEndpoint);
environmentVariables.put(DATA_PLANE_SERVICE_ACCOUNT_CREDENTIALS_PATH_ENV_VAR, dataPlaneServiceAccountCredentialsPath);
Expand Down
Loading