Skip to content

Commit

Permalink
Actually apply Socat CPU override for experimentation (#7534)
Browse files Browse the repository at this point in the history
  • Loading branch information
jdpgrailsdev committed Jun 28, 2023
1 parent 8d04286 commit 217bbd9
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,11 @@

package io.airbyte.workers.orchestrator;

import static io.airbyte.config.EnvConfigs.SOCAT_KUBE_CPU_LIMIT;
import static io.airbyte.config.EnvConfigs.SOCAT_KUBE_CPU_REQUEST;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.functional.CheckedSupplier;
import io.airbyte.commons.temporal.TemporalUtils;
import io.airbyte.config.ReplicationOutput;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.featureflag.ConcurrentSocatResources;
import io.airbyte.featureflag.Connection;
import io.airbyte.featureflag.ContainerOrchestratorDevImage;
import io.airbyte.featureflag.ContainerOrchestratorJavaOpts;
Expand All @@ -27,7 +23,6 @@
import io.airbyte.workers.sync.ReplicationLauncherWorker;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.annotation.Value;
import io.micronaut.core.util.StringUtils;
import io.temporal.activity.ActivityExecutionContext;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
Expand Down Expand Up @@ -85,7 +80,8 @@ public CheckedSupplier<Worker<StandardSyncInput, ReplicationOutput>, Exception>
activityContext,
serverPort,
temporalUtils,
workerConfigs);
workerConfigs,
featureFlagClient);
}

/**
Expand Down Expand Up @@ -114,14 +110,6 @@ static ContainerOrchestratorConfig injectContainerOrchestratorConfig(final Featu
envMap.put("JAVA_OPTS", injectedJavaOpts);
}

// Allow for the override of the socat pod CPU resources as part of the concurrent source read
// experimentation
final String socatResources = client.stringVariation(ConcurrentSocatResources.INSTANCE, new Connection(connectionId));
if (StringUtils.isNotEmpty(socatResources)) {
envMap.put(SOCAT_KUBE_CPU_LIMIT, socatResources);
envMap.put(SOCAT_KUBE_CPU_REQUEST, socatResources);
}

// This is messy because the ContainerOrchestratorConfig is immutable, so we alwasy have to create
// an
// entirely new object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.temporal.TemporalUtils;
import io.airbyte.config.OperatorDbtInput;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.workers.ContainerOrchestratorConfig;
Expand All @@ -35,7 +36,8 @@ public DbtLauncherWorker(final UUID connectionId,
final ContainerOrchestratorConfig containerOrchestratorConfig,
final Supplier<ActivityExecutionContext> activityContext,
final Integer serverPort,
final TemporalUtils temporalUtils) {
final TemporalUtils temporalUtils,
final FeatureFlagClient featureFlagClient) {
super(
connectionId,
DBT,
Expand All @@ -50,6 +52,7 @@ public DbtLauncherWorker(final UUID connectionId,
serverPort,
temporalUtils,
workerConfigs,
featureFlagClient,
// Custom connector does not use Dbt at this moment, thus this flag for runnning job under
// isolated pool can be set to false.
false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.workers.sync;

import static io.airbyte.config.EnvConfigs.SOCAT_KUBE_CPU_LIMIT;
import static io.airbyte.config.EnvConfigs.SOCAT_KUBE_CPU_REQUEST;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.PROCESS_EXIT_VALUE_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;
import static io.airbyte.workers.process.Metadata.CONNECTION_ID_LABEL_KEY;
Expand All @@ -15,6 +17,9 @@
import io.airbyte.commons.temporal.TemporalUtils;
import io.airbyte.commons.temporal.sync.OrchestratorConstants;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.featureflag.ConcurrentSocatResources;
import io.airbyte.featureflag.Connection;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.workers.ContainerOrchestratorConfig;
Expand All @@ -31,6 +36,7 @@
import io.fabric8.kubernetes.api.model.DeletionPropagation;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.micronaut.core.util.StringUtils;
import io.temporal.activity.ActivityExecutionContext;
import java.nio.file.Path;
import java.time.Duration;
Expand All @@ -44,6 +50,8 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Coordinates configuring and managing the state of an async process. This is tied to the (job_id,
Expand All @@ -55,6 +63,8 @@
@Slf4j
public abstract class LauncherWorker<INPUT, OUTPUT> implements Worker<INPUT, OUTPUT> {

private static final Logger LOGGER = LoggerFactory.getLogger(LauncherWorker.class);

private static final Duration MAX_DELETION_TIMEOUT = Duration.ofSeconds(45);

/**
Expand All @@ -79,6 +89,7 @@ public abstract class LauncherWorker<INPUT, OUTPUT> implements Worker<INPUT, OUT
private final boolean isCustomConnector;
private final AtomicBoolean cancelled = new AtomicBoolean(false);
private AsyncOrchestratorPodProcess process;
private final FeatureFlagClient featureFlagClient;

public LauncherWorker(final UUID connectionId,
final String application,
Expand All @@ -92,6 +103,7 @@ public LauncherWorker(final UUID connectionId,
final Integer serverPort,
final TemporalUtils temporalUtils,
final WorkerConfigs workerConfigs,
final FeatureFlagClient featureFlagClient,
final boolean isCustomConnector) {

this.connectionId = connectionId;
Expand All @@ -106,6 +118,7 @@ public LauncherWorker(final UUID connectionId,
this.serverPort = serverPort;
this.temporalUtils = temporalUtils;
this.workerConfigs = workerConfigs;
this.featureFlagClient = featureFlagClient;
this.isCustomConnector = isCustomConnector;

// Generate a random UUID to unique identify the pod process
Expand All @@ -130,6 +143,15 @@ public OUTPUT run(final INPUT input, final Path jobRoot) throws WorkerException
// Merge in the env from the ContainerOrchestratorConfig
containerOrchestratorConfig.environmentVariables().entrySet().stream().forEach(e -> envMap.putIfAbsent(e.getKey(), e.getValue()));

// Allow for the override of the socat pod CPU resources as part of the concurrent source read
// experimentation
final String socatResources = featureFlagClient.stringVariation(ConcurrentSocatResources.INSTANCE, new Connection(connectionId));
if (StringUtils.isNotEmpty(socatResources)) {
LOGGER.info("Overriding Socat CPU limit and request to {}.", socatResources);
envMap.put(SOCAT_KUBE_CPU_LIMIT, socatResources);
envMap.put(SOCAT_KUBE_CPU_REQUEST, socatResources);
}

final Map<String, String> fileMap = new HashMap<>(additionalFileMap);
fileMap.putAll(Map.of(
OrchestratorConstants.INIT_FILE_APPLICATION, application,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.airbyte.commons.temporal.TemporalUtils;
import io.airbyte.config.NormalizationInput;
import io.airbyte.config.NormalizationSummary;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.workers.ContainerOrchestratorConfig;
Expand All @@ -36,7 +37,8 @@ public NormalizationLauncherWorker(final UUID connectionId,
final ContainerOrchestratorConfig containerOrchestratorConfig,
final Supplier<ActivityExecutionContext> activityContext,
final Integer serverPort,
final TemporalUtils temporalUtils) {
final TemporalUtils temporalUtils,
final FeatureFlagClient featureFlagClient) {
super(
connectionId,
NORMALIZATION,
Expand All @@ -51,6 +53,7 @@ public NormalizationLauncherWorker(final UUID connectionId,
serverPort,
temporalUtils,
workerConfigs,
featureFlagClient,
// Normalization process will happen only on a fixed set of connectors,
// thus they are not going to be run under custom connectors. Setting this to false.
false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.airbyte.config.ReplicationOutput;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.workers.ContainerOrchestratorConfig;
Expand Down Expand Up @@ -42,7 +43,8 @@ public ReplicationLauncherWorker(final UUID connectionId,
final Supplier<ActivityExecutionContext> activityContext,
final Integer serverPort,
final TemporalUtils temporalUtils,
final WorkerConfigs workerConfigs) {
final WorkerConfigs workerConfigs,
final FeatureFlagClient featureFlagClient) {
super(
connectionId,
REPLICATION,
Expand All @@ -58,6 +60,7 @@ public ReplicationLauncherWorker(final UUID connectionId,
serverPort,
temporalUtils,
workerConfigs,
featureFlagClient,
sourceLauncherConfig.getIsCustomConnector() || destinationLauncherConfig.getIsCustomConnector());
}

Expand Down
2 changes: 1 addition & 1 deletion airbyte-featureflag/src/main/kotlin/FlagDefinitions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ object RefreshSchemaPeriod : Temporary<Int>(key= "refreshSchema.period.hours", d

object ConcurrentSourceStreamRead : Temporary<Boolean>(key = "concurrent.source.stream.read", default = false)

object ConcurrentSocatResources : Temporary<String>(key = "concurrent.socat.resources", default = "0.5")
object ConcurrentSocatResources : Temporary<String>(key = "concurrent.socat.resources", default = "")

object ReplicationWorkerImpl : Permanent<String>(key = "platform.replication-worker-impl", default = "default")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.config.persistence.split_secrets.SecretsHydrator;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.metrics.lib.MetricClientFactory;
import io.airbyte.metrics.lib.OssMetricsRegistry;
Expand Down Expand Up @@ -67,6 +68,7 @@ public class DbtTransformationActivityImpl implements DbtTransformationActivity
private final AirbyteConfigValidator airbyteConfigValidator;
private final TemporalUtils temporalUtils;
private final AirbyteApiClient airbyteApiClient;
private final FeatureFlagClient featureFlagClient;

public DbtTransformationActivityImpl(@Named("containerOrchestratorConfig") final Optional<ContainerOrchestratorConfig> containerOrchestratorConfig,
final WorkerConfigsProvider workerConfigsProvider,
Expand All @@ -79,7 +81,8 @@ public DbtTransformationActivityImpl(@Named("containerOrchestratorConfig") final
@Value("${micronaut.server.port}") final Integer serverPort,
final AirbyteConfigValidator airbyteConfigValidator,
final TemporalUtils temporalUtils,
final AirbyteApiClient airbyteApiClient) {
final AirbyteApiClient airbyteApiClient,
final FeatureFlagClient featureFlagClient) {
this.containerOrchestratorConfig = containerOrchestratorConfig;
this.workerConfigsProvider = workerConfigsProvider;
this.processFactory = processFactory;
Expand All @@ -92,6 +95,7 @@ public DbtTransformationActivityImpl(@Named("containerOrchestratorConfig") final
this.airbyteConfigValidator = airbyteConfigValidator;
this.temporalUtils = temporalUtils;
this.airbyteApiClient = airbyteApiClient;
this.featureFlagClient = featureFlagClient;
}

@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
Expand Down Expand Up @@ -173,7 +177,8 @@ private CheckedSupplier<Worker<OperatorDbtInput, Void>, Exception> getContainerL
containerOrchestratorConfig.get(),
activityContext,
serverPort,
temporalUtils);
temporalUtils,
featureFlagClient);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.config.persistence.split_secrets.SecretsHydrator;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.metrics.lib.MetricClientFactory;
import io.airbyte.metrics.lib.OssMetricsRegistry;
Expand Down Expand Up @@ -76,6 +77,7 @@ public class NormalizationActivityImpl implements NormalizationActivity {
private final AirbyteConfigValidator airbyteConfigValidator;
private final TemporalUtils temporalUtils;
private final AirbyteApiClient airbyteApiClient;
private final FeatureFlagClient featureFlagClient;

private static final String V1_NORMALIZATION_MINOR_VERSION = "3";

Expand All @@ -90,7 +92,8 @@ public NormalizationActivityImpl(@Named("containerOrchestratorConfig") final Opt
@Value("${micronaut.server.port}") final Integer serverPort,
final AirbyteConfigValidator airbyteConfigValidator,
final TemporalUtils temporalUtils,
final AirbyteApiClient airbyteApiClient) {
final AirbyteApiClient airbyteApiClient,
final FeatureFlagClient featureFlagClient) {
this.containerOrchestratorConfig = containerOrchestratorConfig;
this.workerConfigsProvider = workerConfigsProvider;
this.processFactory = processFactory;
Expand All @@ -103,6 +106,7 @@ public NormalizationActivityImpl(@Named("containerOrchestratorConfig") final Opt
this.airbyteConfigValidator = airbyteConfigValidator;
this.temporalUtils = temporalUtils;
this.airbyteApiClient = airbyteApiClient;
this.featureFlagClient = featureFlagClient;
}

@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
Expand Down Expand Up @@ -265,7 +269,8 @@ private CheckedSupplier<Worker<NormalizationInput, NormalizationSummary>, Except
containerOrchestratorConfig.get(),
activityContext,
serverPort,
temporalUtils);
temporalUtils,
featureFlagClient);
}

}
2 changes: 1 addition & 1 deletion flags.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ flags:
- name: concurrent.source.stream.read
serve: false
- name: concurrent.socat.resources
serve: "0.5"
serve: ""
- name: platform.add-scheduling-jitter
serve: false

0 comments on commit 217bbd9

Please sign in to comment.