Skip to content

Commit

Permalink
revert: "revert: "chore: job Run config as env" (#13317)" (#13319)
Browse files Browse the repository at this point in the history
Co-authored-by: Ryan Br... <ryan.broughan@gmail.com>
  • Loading branch information
benmoriceau and tryangul committed Jul 31, 2024
1 parent b44b203 commit 52ab551
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,14 @@ public OUTPUT run(final INPUT input, final Path jobRoot) throws WorkerException
// Manually add the worker environment to the env var map
envMap.put(WorkerConstants.WORKER_ENVIRONMENT, containerOrchestratorConfig.workerEnvironment().name());
envMap.put(WorkerConstants.WORKER_APPLICATION, ReplicationLauncherWorker.REPLICATION);
envMap.put(WorkerConstants.JOB_ID, jobRunConfig.getJobId());
envMap.put(WorkerConstants.ATTEMPT_ID, jobRunConfig.getAttemptId().toString());

// Merge in the env from the ContainerOrchestratorConfig
containerOrchestratorConfig.environmentVariables().entrySet().stream().forEach(e -> envMap.putIfAbsent(e.getKey(), e.getValue()));

final Map<String, String> fileMap = new HashMap<>(additionalFileMap);
fileMap.putAll(Map.of(
OrchestratorConstants.INIT_FILE_JOB_RUN_CONFIG, Jsons.serialize(jobRunConfig),
OrchestratorConstants.INIT_FILE_INPUT, Jsons.serialize(input)));

final Map<Integer, Integer> portMap = Map.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public class WorkerConstants {

public static final String WORKER_ENVIRONMENT = "WORKER_ENVIRONMENT";
public static final String WORKER_APPLICATION = "WORKER_APPLICATION";
public static final String ATTEMPT_ID = "ATTEMPT_ID";
public static final String JOB_ID = "JOB_ID";

public static final String DD_ENV_VAR = "-XX:+ExitOnOutOfMemoryError "
+ "-XX:MaxRAMPercentage=75.0 "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package io.airbyte.commons.envvar
enum class EnvVar {
AIRBYTE_ROLE,
AIRBYTE_VERSION,
ATTEMPT_ID,
AWS_ACCESS_KEY_ID,
AWS_ASSUME_ROLE_SECRET_ACCESS_KEY,
AWS_ASSUME_ROLE_SECRET_NAME,
Expand Down Expand Up @@ -44,6 +45,7 @@ enum class EnvVar {
JOB_DEFAULT_ENV_MAP,
JOB_ERROR_REPORTING_SENTRY_DSN,
JOB_ERROR_REPORTING_STRATEGY,
JOB_ID,
JOB_ISOLATED_KUBE_NODE_SELECTORS,
JOB_KUBE_ANNOTATIONS,
JOB_KUBE_BUSYBOX_IMAGE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import io.airbyte.workers.process.AsyncOrchestratorPodProcess;
import io.airbyte.workers.process.KubePodInfo;
import io.airbyte.workers.process.KubePodProcess;
import io.airbyte.workers.sync.OrchestratorConstants;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Value;
import jakarta.annotation.Nullable;
Expand Down Expand Up @@ -42,13 +41,14 @@ String configDir(@Value("${airbyte.config-dir}") @Nullable final String configDi
/**
* Returns the contents of the OrchestratorConstants.INIT_FILE_JOB_RUN_CONFIG file.
*
* @param configDir Which directory contains the OrchestratorConstants.INIT_FILE_JOB_RUN_CONFIG
* file.
* @param jobId Which job is being run.
* @param attemptId Which attempt of the job is being run.
* @return Contents of OrchestratorConstants.INIT_FILE_JOB_RUN_CONFIG
*/
@Singleton
JobRunConfig jobRunConfig(@Named("configDir") final String configDir) {
return Jsons.deserialize(Path.of(configDir, OrchestratorConstants.INIT_FILE_JOB_RUN_CONFIG).toFile(), JobRunConfig.class);
JobRunConfig jobRunConfig(@Value("${airbyte.job-id}") @Nullable final String jobId,
@Value("${airbyte.attempt-id}") @Nullable final long attemptId) {
return new JobRunConfig().withJobId(jobId).withAttemptId(attemptId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ micronaut:

airbyte:
application: ${WORKER_APPLICATION:replication-orchestrator}
job-id: ${JOB_ID:}
attempt-id: ${ATTEMPT_ID:}
deployment-mode: ${DEPLOYMENT_MODE:OSS}
role: ${AIRBYTE_ROLE:dev}
version: ${AIRBYTE_VERSION:dev}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import io.airbyte.featureflag.InjectAwsSecretsToConnectorPods
import io.airbyte.featureflag.OrchestratorFetchesInputFromInit
import io.airbyte.featureflag.Workspace
import io.airbyte.persistence.job.models.IntegrationLauncherConfig
import io.airbyte.persistence.job.models.JobRunConfig
import io.airbyte.persistence.job.models.ReplicationInput
import io.airbyte.workers.models.CheckConnectionInput
import io.airbyte.workers.models.DiscoverCatalogInput
Expand Down Expand Up @@ -71,7 +70,7 @@ class PayloadKubeInputMapper(
val orchestratorReqs = input.getOrchestratorResourceReqs()
val nodeSelectors = getNodeSelectors(input.usesCustomConnector(), replicationWorkerConfigs)

val fileMap = buildSyncFileMap(input, input.jobRunConfig)
val fileMap = buildSyncFileMap(input)

return OrchestratorKubeInput(
labeler.getReplicationOrchestratorLabels() + sharedLabels,
Expand All @@ -82,7 +81,11 @@ class PayloadKubeInputMapper(
fileMap,
orchestratorReqs,
replicationWorkerConfigs.workerKubeAnnotations,
listOf(EnvVar(AirbyteEnvVar.WORKLOAD_ID.toString(), workloadId, null)),
listOf(
EnvVar(AirbyteEnvVar.WORKLOAD_ID.toString(), workloadId, null),
EnvVar(AirbyteEnvVar.JOB_ID.toString(), jobId, null),
EnvVar(AirbyteEnvVar.ATTEMPT_ID.toString(), attemptId.toString(), null),
),
)
}

Expand Down Expand Up @@ -246,15 +249,11 @@ class PayloadKubeInputMapper(

// TODO: This is the way we pass data into the pods we launch. This should be extracted to
// some shared interface between parent / child to make it less brittle.
private fun buildSyncFileMap(
input: ReplicationInput,
jobRunConfig: JobRunConfig,
): Map<String, String> {
private fun buildSyncFileMap(input: ReplicationInput): Map<String, String> {
return buildMap {
if (!featureFlagClient.boolVariation(OrchestratorFetchesInputFromInit, Connection(input.connectionId))) {
put(OrchestratorConstants.INIT_FILE_INPUT, serializer.serialize(input))
}
put(OrchestratorConstants.INIT_FILE_JOB_RUN_CONFIG, serializer.serialize(jobRunConfig))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,21 @@ class PayloadKubeInputMapperTest {
assert(result.kubePodInfo == KubePodInfo(namespace, "orchestrator-repl-job-415-attempt-7654", containerInfo))
val expectedFileMap: Map<String, String> =
buildMap {
put(OrchestratorConstants.INIT_FILE_JOB_RUN_CONFIG, mockSerializedOutput)
if (shouldKubeCpInput) {
put(OrchestratorConstants.INIT_FILE_INPUT, mockSerializedOutput)
}
}

assert(result.fileMap == expectedFileMap)
assert(result.resourceReqs == resourceReqs)
assert(result.extraEnv == listOf(EnvVar(AirbyteEnvVar.WORKLOAD_ID.toString(), workloadId, null)))
assert(
result.extraEnv ==
listOf(
EnvVar(AirbyteEnvVar.WORKLOAD_ID.toString(), workloadId, null),
EnvVar(AirbyteEnvVar.JOB_ID.toString(), jobId, null),
EnvVar(AirbyteEnvVar.ATTEMPT_ID.toString(), attemptId.toString(), null),
),
)
}

@ParameterizedTest
Expand Down

0 comments on commit 52ab551

Please sign in to comment.