From 0333e6836ac15de32317bc93ed47bc964c4e3ea3 Mon Sep 17 00:00:00 2001 From: Guy Feldman Date: Sat, 6 Aug 2022 14:09:01 -0400 Subject: [PATCH 1/8] Set the image pull policy on the container orchestration container to the main job image pull policy --- .../java/io/airbyte/workers/WorkerApp.java | 1 + .../process/AsyncOrchestratorPodProcess.java | 4 + .../workers/temporal/sync/LauncherWorker.java | 1 + ...OrchestratorPodProcessIntegrationTest.java | 74 ++++++++++--------- 4 files changed, 46 insertions(+), 34 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java index c4c75b3346e4..70c92105e214 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java @@ -375,6 +375,7 @@ static Optional getContainerOrchestratorConfig(fina configs.getContainerOrchestratorSecretName(), configs.getContainerOrchestratorSecretMountPath(), configs.getContainerOrchestratorImage(), + configs.getJobKubeMainContainerImagePullPolicy(), configs.getGoogleApplicationCredentials())); } else { return Optional.empty(); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/AsyncOrchestratorPodProcess.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/AsyncOrchestratorPodProcess.java index ac79c44c0dfc..cf0bc25129c4 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/AsyncOrchestratorPodProcess.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/AsyncOrchestratorPodProcess.java @@ -60,6 +60,7 @@ public class AsyncOrchestratorPodProcess implements KubePod { private final String secretName; private final String secretMountPath; private final String containerOrchestratorImage; + private final String containerOrchestratorImagePullPolicy; private final String googleApplicationCredentials; private final AtomicReference> cachedExitValue; private final boolean useStreamCapableState; @@ -71,6 +72,7 @@ public AsyncOrchestratorPodProcess( final String secretName, final String secretMountPath, final String containerOrchestratorImage, + final String containerOrchestratorImagePullPolicy, final String googleApplicationCredentials, final boolean useStreamCapableState) { this.kubePodInfo = kubePodInfo; @@ -79,6 +81,7 @@ public AsyncOrchestratorPodProcess( this.secretName = secretName; this.secretMountPath = secretMountPath; this.containerOrchestratorImage = containerOrchestratorImage; + this.containerOrchestratorImagePullPolicy = containerOrchestratorImagePullPolicy; this.googleApplicationCredentials = googleApplicationCredentials; this.cachedExitValue = new AtomicReference<>(Optional.empty()); this.useStreamCapableState = useStreamCapableState; @@ -295,6 +298,7 @@ public void create(final Map allLabels, final var mainContainer = new ContainerBuilder() .withName(KubePodProcess.MAIN_CONTAINER_NAME) .withImage(containerOrchestratorImage) + .withImagePullPolicy(containerOrchestratorImagePullPolicy) .withResources(KubePodProcess.getResourceRequirementsBuilder(resourceRequirements).build()) .withEnv(envVars) .withPorts(containerPorts) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java index 3dd1f602fdaf..fa237464fc45 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java @@ -126,6 +126,7 @@ public OUTPUT run(final INPUT input, final Path jobRoot) throws WorkerException containerOrchestratorConfig.secretName(), containerOrchestratorConfig.secretMountPath(), containerOrchestratorConfig.containerOrchestratorImage(), + containerOrchestratorConfig.containerOrchestratorPullPolicy(), containerOrchestratorConfig.googleApplicationCredentials(), featureFlag.useStreamCapableState()); diff --git a/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/AsyncOrchestratorPodProcessIntegrationTest.java b/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/AsyncOrchestratorPodProcessIntegrationTest.java index 900a992a731d..878b36423d47 100644 --- a/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/AsyncOrchestratorPodProcessIntegrationTest.java +++ b/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/AsyncOrchestratorPodProcessIntegrationTest.java @@ -24,6 +24,7 @@ import io.fabric8.kubernetes.client.DefaultKubernetesClient; import io.fabric8.kubernetes.client.KubernetesClient; import java.nio.file.Path; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -102,47 +103,52 @@ public void test() throws InterruptedException { // make kubepodinfo final var kubePodInfo = new KubePodInfo("default", podName); + final var pullPolicies = List.of("IfNotPresent", "Always"); // another activity issues the request to create the pod process -> here we'll just create it - final var asyncProcess = new AsyncOrchestratorPodProcess( - kubePodInfo, - documentStoreClient, - kubernetesClient, - null, - null, - "airbyte/container-orchestrator:dev", - null, - true); - - final Map portMap = Map.of( - WorkerApp.KUBE_HEARTBEAT_PORT, WorkerApp.KUBE_HEARTBEAT_PORT, - OrchestratorConstants.PORT1, OrchestratorConstants.PORT1, - OrchestratorConstants.PORT2, OrchestratorConstants.PORT2, - OrchestratorConstants.PORT3, OrchestratorConstants.PORT3, - OrchestratorConstants.PORT4, OrchestratorConstants.PORT4); - - final Map envMap = System.getenv().entrySet().stream() - .filter(entry -> OrchestratorConstants.ENV_VARS_TO_TRANSFER.contains(entry.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - - asyncProcess.create(Map.of(), new WorkerConfigs(new EnvConfigs()).getResourceRequirements(), Map.of( - OrchestratorConstants.INIT_FILE_APPLICATION, AsyncOrchestratorPodProcess.NO_OP, - OrchestratorConstants.INIT_FILE_ENV_MAP, Jsons.serialize(envMap)), portMap); - - // a final activity waits until there is output from the kube pod process - asyncProcess.waitFor(10, TimeUnit.SECONDS); - - final var exitValue = asyncProcess.exitValue(); - final var output = asyncProcess.getOutput(); - - assertEquals(0, exitValue); - assertTrue(output.isPresent()); - assertEquals("expected output", output.get()); + for (String pullPolicy : pullPolicies) { + final var asyncProcess = new AsyncOrchestratorPodProcess( + kubePodInfo, + documentStoreClient, + kubernetesClient, + null, + null, + "airbyte/container-orchestrator:dev", + pullPolicy, + null, + true); + + final Map portMap = Map.of( + WorkerApp.KUBE_HEARTBEAT_PORT, WorkerApp.KUBE_HEARTBEAT_PORT, + OrchestratorConstants.PORT1, OrchestratorConstants.PORT1, + OrchestratorConstants.PORT2, OrchestratorConstants.PORT2, + OrchestratorConstants.PORT3, OrchestratorConstants.PORT3, + OrchestratorConstants.PORT4, OrchestratorConstants.PORT4); + + final Map envMap = System.getenv().entrySet().stream() + .filter(entry -> OrchestratorConstants.ENV_VARS_TO_TRANSFER.contains(entry.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + asyncProcess.create(Map.of(), new WorkerConfigs(new EnvConfigs()).getResourceRequirements(), Map.of( + OrchestratorConstants.INIT_FILE_APPLICATION, AsyncOrchestratorPodProcess.NO_OP, + OrchestratorConstants.INIT_FILE_ENV_MAP, Jsons.serialize(envMap)), portMap); + + // a final activity waits until there is output from the kube pod process + asyncProcess.waitFor(10, TimeUnit.SECONDS); + + final var exitValue = asyncProcess.exitValue(); + final var output = asyncProcess.getOutput(); + + assertEquals(0, exitValue); + assertTrue(output.isPresent()); + assertEquals("expected output", output.get()); + } } @AfterAll public static void teardown() { try { portForwardProcess.destroyForcibly(); + } catch (final Exception e) { } catch (final Exception e) { e.printStackTrace(); } From 18b4df5bfa87f905d49b103adec48539f85f3aea Mon Sep 17 00:00:00 2001 From: Guy Feldman Date: Sat, 6 Aug 2022 14:24:05 -0400 Subject: [PATCH 2/8] make sure pod have different names --- ...AsyncOrchestratorPodProcessIntegrationTest.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/AsyncOrchestratorPodProcessIntegrationTest.java b/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/AsyncOrchestratorPodProcessIntegrationTest.java index 878b36423d47..05ad52395f8e 100644 --- a/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/AsyncOrchestratorPodProcessIntegrationTest.java +++ b/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/AsyncOrchestratorPodProcessIntegrationTest.java @@ -98,14 +98,15 @@ public static void init() throws Exception { @Test public void test() throws InterruptedException { - final var podName = "test-async-" + RandomStringUtils.randomAlphabetic(10).toLowerCase(); - - // make kubepodinfo - final var kubePodInfo = new KubePodInfo("default", podName); final var pullPolicies = List.of("IfNotPresent", "Always"); - // another activity issues the request to create the pod process -> here we'll just create it - for (String pullPolicy : pullPolicies) { + for (final String pullPolicy : pullPolicies) { + + final var podName = "test-async-" + RandomStringUtils.randomAlphabetic(10).toLowerCase(); + // make kubepodinfo + final var kubePodInfo = new KubePodInfo("default", podName); + + // another activity issues the request to create the pod process -> here we'll just create it final var asyncProcess = new AsyncOrchestratorPodProcess( kubePodInfo, documentStoreClient, @@ -148,7 +149,6 @@ public void test() throws InterruptedException { public static void teardown() { try { portForwardProcess.destroyForcibly(); - } catch (final Exception e) { } catch (final Exception e) { e.printStackTrace(); } From b5f509dd4cc4d26202dc9be273117fc89d069eb5 Mon Sep 17 00:00:00 2001 From: Guy Feldman Date: Sat, 6 Aug 2022 14:38:40 -0400 Subject: [PATCH 3/8] fix naming --- .../java/io/airbyte/workers/WorkerApp.java | 25 +++++++++------- .../workers/temporal/sync/LauncherWorker.java | 30 +++++++++---------- 2 files changed, 28 insertions(+), 27 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java index 70c92105e214..4fa5255039a7 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java @@ -270,7 +270,7 @@ private void registerGetSpec(final WorkerFactory factory) { } private ReplicationActivityImpl getReplicationActivityImpl(final WorkerConfigs workerConfigs, - final ProcessFactory jobProcessFactory) { + final ProcessFactory jobProcessFactory) { return new ReplicationActivityImpl( containerOrchestratorConfig, @@ -286,7 +286,7 @@ private ReplicationActivityImpl getReplicationActivityImpl(final WorkerConfigs w } private NormalizationActivityImpl getNormalizationActivityImpl(final WorkerConfigs workerConfigs, - final ProcessFactory jobProcessFactory) { + final ProcessFactory jobProcessFactory) { return new NormalizationActivityImpl( containerOrchestratorConfig, @@ -301,7 +301,7 @@ private NormalizationActivityImpl getNormalizationActivityImpl(final WorkerConfi } private DbtTransformationActivityImpl getDbtActivityImpl(final WorkerConfigs workerConfigs, - final ProcessFactory jobProcessFactory) { + final ProcessFactory jobProcessFactory) { return new DbtTransformationActivityImpl( containerOrchestratorConfig, @@ -351,14 +351,17 @@ private static WorkerOptions getWorkerOptions(final int max) { .build(); } - public record ContainerOrchestratorConfig( - String namespace, - DocumentStoreClient documentStoreClient, - KubernetesClient kubernetesClient, - String secretName, - String secretMountPath, - String containerOrchestratorImage, - String googleApplicationCredentials) {} + public static record ContainerOrchestratorConfig( + String namespace, + DocumentStoreClient documentStoreClient, + KubernetesClient kubernetesClient, + String secretName, + String secretMountPath, + String containerOrchestratorImage, + String containerOrchestratorImagePullPolicy, + String googleApplicationCredentials) { + + } static Optional getContainerOrchestratorConfig(final Configs configs) { if (configs.getContainerOrchestratorEnabled()) { diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java index fa237464fc45..122e48e91081 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java @@ -38,10 +38,10 @@ import lombok.val; /** - * Coordinates configuring and managing the state of an async process. This is tied to the (job_id, - * attempt_id) and will attempt to kill off lower attempt ids. + * Coordinates configuring and managing the state of an async process. This is tied to the (job_id, attempt_id) and will attempt to kill off lower + * attempt ids. * - * @param a json-serializable input class for the worker + * @param a json-serializable input class for the worker * @param either {@link Void} or a json-serializable output class for the worker */ @Slf4j @@ -64,15 +64,14 @@ public class LauncherWorker implements Worker { private AsyncOrchestratorPodProcess process; public LauncherWorker(final UUID connectionId, - final String application, - final String podNamePrefix, - final JobRunConfig jobRunConfig, - final Map additionalFileMap, - final WorkerApp.ContainerOrchestratorConfig containerOrchestratorConfig, - final ResourceRequirements resourceRequirements, - final Class outputClass, - final Supplier activityContext) { - + final String application, + final String podNamePrefix, + final JobRunConfig jobRunConfig, + final Map additionalFileMap, + final WorkerApp.ContainerOrchestratorConfig containerOrchestratorConfig, + final ResourceRequirements resourceRequirements, + final Class outputClass, + final Supplier activityContext) { this.connectionId = connectionId; this.application = application; this.podNamePrefix = podNamePrefix; @@ -126,7 +125,7 @@ public OUTPUT run(final INPUT input, final Path jobRoot) throws WorkerException containerOrchestratorConfig.secretName(), containerOrchestratorConfig.secretMountPath(), containerOrchestratorConfig.containerOrchestratorImage(), - containerOrchestratorConfig.containerOrchestratorPullPolicy(), + containerOrchestratorConfig.containerOrchestratorImagePullPolicy(), containerOrchestratorConfig.googleApplicationCredentials(), featureFlag.useStreamCapableState()); @@ -187,9 +186,8 @@ public OUTPUT run(final INPUT input, final Path jobRoot) throws WorkerException } /** - * It is imperative that we do not run multiple replications, normalizations, syncs, etc. at the - * same time. Our best bet is to kill everything that is labelled with the connection id and wait - * until no more pods exist with that connection id. + * It is imperative that we do not run multiple replications, normalizations, syncs, etc. at the same time. Our best bet is to kill everything that + * is labelled with the connection id and wait until no more pods exist with that connection id. */ private void killRunningPodsForConnection() { final var client = containerOrchestratorConfig.kubernetesClient(); From 519b9e0d1ac271e76e1648778bd73fdff2667fd0 Mon Sep 17 00:00:00 2001 From: Guy Feldman Date: Tue, 9 Aug 2022 03:17:46 -0400 Subject: [PATCH 4/8] formatting --- .../java/io/airbyte/workers/WorkerApp.java | 22 +++++++-------- .../workers/temporal/sync/LauncherWorker.java | 27 ++++++++++--------- 2 files changed, 25 insertions(+), 24 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java index 4fa5255039a7..d8762a733067 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java @@ -270,7 +270,7 @@ private void registerGetSpec(final WorkerFactory factory) { } private ReplicationActivityImpl getReplicationActivityImpl(final WorkerConfigs workerConfigs, - final ProcessFactory jobProcessFactory) { + final ProcessFactory jobProcessFactory) { return new ReplicationActivityImpl( containerOrchestratorConfig, @@ -286,7 +286,7 @@ private ReplicationActivityImpl getReplicationActivityImpl(final WorkerConfigs w } private NormalizationActivityImpl getNormalizationActivityImpl(final WorkerConfigs workerConfigs, - final ProcessFactory jobProcessFactory) { + final ProcessFactory jobProcessFactory) { return new NormalizationActivityImpl( containerOrchestratorConfig, @@ -301,7 +301,7 @@ private NormalizationActivityImpl getNormalizationActivityImpl(final WorkerConfi } private DbtTransformationActivityImpl getDbtActivityImpl(final WorkerConfigs workerConfigs, - final ProcessFactory jobProcessFactory) { + final ProcessFactory jobProcessFactory) { return new DbtTransformationActivityImpl( containerOrchestratorConfig, @@ -352,14 +352,14 @@ private static WorkerOptions getWorkerOptions(final int max) { } public static record ContainerOrchestratorConfig( - String namespace, - DocumentStoreClient documentStoreClient, - KubernetesClient kubernetesClient, - String secretName, - String secretMountPath, - String containerOrchestratorImage, - String containerOrchestratorImagePullPolicy, - String googleApplicationCredentials) { + String namespace, + DocumentStoreClient documentStoreClient, + KubernetesClient kubernetesClient, + String secretName, + String secretMountPath, + String containerOrchestratorImage, + String containerOrchestratorImagePullPolicy, + String googleApplicationCredentials) { } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java index 122e48e91081..30b2b4481ef3 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java @@ -38,10 +38,10 @@ import lombok.val; /** - * Coordinates configuring and managing the state of an async process. This is tied to the (job_id, attempt_id) and will attempt to kill off lower - * attempt ids. + * Coordinates configuring and managing the state of an async process. This is tied to the (job_id, + * attempt_id) and will attempt to kill off lower attempt ids. * - * @param a json-serializable input class for the worker + * @param a json-serializable input class for the worker * @param either {@link Void} or a json-serializable output class for the worker */ @Slf4j @@ -64,14 +64,14 @@ public class LauncherWorker implements Worker { private AsyncOrchestratorPodProcess process; public LauncherWorker(final UUID connectionId, - final String application, - final String podNamePrefix, - final JobRunConfig jobRunConfig, - final Map additionalFileMap, - final WorkerApp.ContainerOrchestratorConfig containerOrchestratorConfig, - final ResourceRequirements resourceRequirements, - final Class outputClass, - final Supplier activityContext) { + final String application, + final String podNamePrefix, + final JobRunConfig jobRunConfig, + final Map additionalFileMap, + final WorkerApp.ContainerOrchestratorConfig containerOrchestratorConfig, + final ResourceRequirements resourceRequirements, + final Class outputClass, + final Supplier activityContext) { this.connectionId = connectionId; this.application = application; this.podNamePrefix = podNamePrefix; @@ -186,8 +186,9 @@ public OUTPUT run(final INPUT input, final Path jobRoot) throws WorkerException } /** - * It is imperative that we do not run multiple replications, normalizations, syncs, etc. at the same time. Our best bet is to kill everything that - * is labelled with the connection id and wait until no more pods exist with that connection id. + * It is imperative that we do not run multiple replications, normalizations, syncs, etc. at the + * same time. Our best bet is to kill everything that is labelled with the connection id and wait + * until no more pods exist with that connection id. */ private void killRunningPodsForConnection() { final var client = containerOrchestratorConfig.kubernetesClient(); From 9e2714c131f4c9fecc9dc32cb38b404c069d5fd7 Mon Sep 17 00:00:00 2001 From: Guy Feldman Date: Sat, 13 Aug 2022 10:43:01 -0400 Subject: [PATCH 5/8] adding container info and moving fields into it --- .../DefaultAsyncStateManagerTest.java | 4 +- .../process/AsyncOrchestratorPodProcess.java | 10 +- .../workers/process/KubeContainerInfo.java | 7 ++ .../airbyte/workers/process/KubePodInfo.java | 2 +- .../workers/process/KubePodProcess.java | 21 +++- .../workers/temporal/sync/LauncherWorker.java | 10 +- ...OrchestratorPodProcessIntegrationTest.java | 95 +++++++++---------- 7 files changed, 85 insertions(+), 64 deletions(-) create mode 100644 airbyte-workers/src/main/java/io/airbyte/workers/process/KubeContainerInfo.java diff --git a/airbyte-container-orchestrator/src/test/java/io/airbyte/container_orchestrator/DefaultAsyncStateManagerTest.java b/airbyte-container-orchestrator/src/test/java/io/airbyte/container_orchestrator/DefaultAsyncStateManagerTest.java index 4ac2c82bea80..53ae95618b3b 100644 --- a/airbyte-container-orchestrator/src/test/java/io/airbyte/container_orchestrator/DefaultAsyncStateManagerTest.java +++ b/airbyte-container-orchestrator/src/test/java/io/airbyte/container_orchestrator/DefaultAsyncStateManagerTest.java @@ -11,6 +11,7 @@ import io.airbyte.workers.general.DocumentStoreClient; import io.airbyte.workers.process.AsyncKubePodStatus; +import io.airbyte.workers.process.KubeContainerInfo; import io.airbyte.workers.process.KubePodInfo; import java.util.Optional; import org.junit.jupiter.api.BeforeEach; @@ -18,7 +19,8 @@ class DefaultAsyncStateManagerTest { - private static final KubePodInfo KUBE_POD_INFO = new KubePodInfo("default", "pod1"); + + private static final KubePodInfo KUBE_POD_INFO = new KubePodInfo("default", "pod1", new KubeContainerInfo("busybox", "IfNotPresent")); private static final String OUTPUT = "some output value"; private DocumentStoreClient documentStore; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/AsyncOrchestratorPodProcess.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/AsyncOrchestratorPodProcess.java index cf0bc25129c4..b17583e613fa 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/AsyncOrchestratorPodProcess.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/AsyncOrchestratorPodProcess.java @@ -59,8 +59,6 @@ public class AsyncOrchestratorPodProcess implements KubePod { private final KubernetesClient kubernetesClient; private final String secretName; private final String secretMountPath; - private final String containerOrchestratorImage; - private final String containerOrchestratorImagePullPolicy; private final String googleApplicationCredentials; private final AtomicReference> cachedExitValue; private final boolean useStreamCapableState; @@ -71,8 +69,6 @@ public AsyncOrchestratorPodProcess( final KubernetesClient kubernetesClient, final String secretName, final String secretMountPath, - final String containerOrchestratorImage, - final String containerOrchestratorImagePullPolicy, final String googleApplicationCredentials, final boolean useStreamCapableState) { this.kubePodInfo = kubePodInfo; @@ -80,8 +76,6 @@ public AsyncOrchestratorPodProcess( this.kubernetesClient = kubernetesClient; this.secretName = secretName; this.secretMountPath = secretMountPath; - this.containerOrchestratorImage = containerOrchestratorImage; - this.containerOrchestratorImagePullPolicy = containerOrchestratorImagePullPolicy; this.googleApplicationCredentials = googleApplicationCredentials; this.cachedExitValue = new AtomicReference<>(Optional.empty()); this.useStreamCapableState = useStreamCapableState; @@ -297,8 +291,8 @@ public void create(final Map allLabels, final var mainContainer = new ContainerBuilder() .withName(KubePodProcess.MAIN_CONTAINER_NAME) - .withImage(containerOrchestratorImage) - .withImagePullPolicy(containerOrchestratorImagePullPolicy) + .withImage(kubePodInfo.mainContainerInfo().image()) + .withImagePullPolicy(kubePodInfo.mainContainerInfo().pullPolicy()) .withResources(KubePodProcess.getResourceRequirementsBuilder(resourceRequirements).build()) .withEnv(envVars) .withPorts(containerPorts) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeContainerInfo.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeContainerInfo.java new file mode 100644 index 000000000000..bafe637c3bf8 --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeContainerInfo.java @@ -0,0 +1,7 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.process; + +public record KubeContainerInfo(String image, String pullPolicy) {} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodInfo.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodInfo.java index 18be5eae58c7..897b083d5f26 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodInfo.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodInfo.java @@ -4,4 +4,4 @@ package io.airbyte.workers.process; -public record KubePodInfo(String namespace, String name) {} +public record KubePodInfo(String namespace, String name, KubeContainerInfo mainContainerInfo) {} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java index 9caa5de50522..02e7773be0c1 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java @@ -47,6 +47,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -674,9 +675,27 @@ public Info info() { return new KubePodProcessInfo(podDefinition.getMetadata().getName()); } + private Container getMainContainerFromPodDefinition() { + final Optional containerOptional = podDefinition.getSpec() + .getContainers() + .stream() + .filter(c -> c.getName().contentEquals(MAIN_CONTAINER_NAME)) + .findFirst(); + if (containerOptional.isEmpty()) { + throw new RuntimeException(String.format("Could not find main container definition for pod: %s", podDefinition.getMetadata().getName())); + } else { + return containerOptional.get(); + } + + } + @Override public KubePodInfo getInfo() { - return new KubePodInfo(podDefinition.getMetadata().getNamespace(), podDefinition.getMetadata().getName()); + final Container mainContainer = getMainContainerFromPodDefinition(); + final KubeContainerInfo mainContainerInfo = new KubeContainerInfo(mainContainer.getImage(), mainContainer.getImagePullPolicy()); + return new KubePodInfo(podDefinition.getMetadata().getNamespace(), + podDefinition.getMetadata().getName(), + mainContainerInfo); } /** diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java index 30b2b4481ef3..d58951414a13 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java @@ -15,6 +15,7 @@ import io.airbyte.workers.exception.WorkerException; import io.airbyte.workers.process.AsyncKubePodStatus; import io.airbyte.workers.process.AsyncOrchestratorPodProcess; +import io.airbyte.workers.process.KubeContainerInfo; import io.airbyte.workers.process.KubePodInfo; import io.airbyte.workers.process.KubePodResourceHelper; import io.airbyte.workers.process.KubeProcessFactory; @@ -114,8 +115,11 @@ public OUTPUT run(final INPUT input, final Path jobRoot) throws WorkerException final var podNameAndJobPrefix = podNamePrefix + "-job-" + jobRunConfig.getJobId() + "-attempt-"; final var podName = podNameAndJobPrefix + jobRunConfig.getAttemptId(); - - final var kubePodInfo = new KubePodInfo(containerOrchestratorConfig.namespace(), podName); + final var mainContainerInfo = new KubeContainerInfo(containerOrchestratorConfig.containerOrchestratorImage(), + containerOrchestratorConfig.containerOrchestratorImagePullPolicy()); + final var kubePodInfo = new KubePodInfo(containerOrchestratorConfig.namespace(), + podName, + mainContainerInfo); val featureFlag = new EnvVariableFeatureFlags(); process = new AsyncOrchestratorPodProcess( @@ -124,8 +128,6 @@ public OUTPUT run(final INPUT input, final Path jobRoot) throws WorkerException containerOrchestratorConfig.kubernetesClient(), containerOrchestratorConfig.secretName(), containerOrchestratorConfig.secretMountPath(), - containerOrchestratorConfig.containerOrchestratorImage(), - containerOrchestratorConfig.containerOrchestratorImagePullPolicy(), containerOrchestratorConfig.googleApplicationCredentials(), featureFlag.useStreamCapableState()); diff --git a/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/AsyncOrchestratorPodProcessIntegrationTest.java b/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/AsyncOrchestratorPodProcessIntegrationTest.java index 05ad52395f8e..eec896b4e399 100644 --- a/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/AsyncOrchestratorPodProcessIntegrationTest.java +++ b/airbyte-workers/src/test-integration/java/io/airbyte/workers/process/AsyncOrchestratorPodProcessIntegrationTest.java @@ -24,7 +24,6 @@ import io.fabric8.kubernetes.client.DefaultKubernetesClient; import io.fabric8.kubernetes.client.KubernetesClient; import java.nio.file.Path; -import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -32,7 +31,8 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; @Disabled @@ -96,53 +96,50 @@ public static void init() throws Exception { Path.of("/")); } - @Test - public void test() throws InterruptedException { - - final var pullPolicies = List.of("IfNotPresent", "Always"); - for (final String pullPolicy : pullPolicies) { - - final var podName = "test-async-" + RandomStringUtils.randomAlphabetic(10).toLowerCase(); - // make kubepodinfo - final var kubePodInfo = new KubePodInfo("default", podName); - - // another activity issues the request to create the pod process -> here we'll just create it - final var asyncProcess = new AsyncOrchestratorPodProcess( - kubePodInfo, - documentStoreClient, - kubernetesClient, - null, - null, - "airbyte/container-orchestrator:dev", - pullPolicy, - null, - true); - - final Map portMap = Map.of( - WorkerApp.KUBE_HEARTBEAT_PORT, WorkerApp.KUBE_HEARTBEAT_PORT, - OrchestratorConstants.PORT1, OrchestratorConstants.PORT1, - OrchestratorConstants.PORT2, OrchestratorConstants.PORT2, - OrchestratorConstants.PORT3, OrchestratorConstants.PORT3, - OrchestratorConstants.PORT4, OrchestratorConstants.PORT4); - - final Map envMap = System.getenv().entrySet().stream() - .filter(entry -> OrchestratorConstants.ENV_VARS_TO_TRANSFER.contains(entry.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - - asyncProcess.create(Map.of(), new WorkerConfigs(new EnvConfigs()).getResourceRequirements(), Map.of( - OrchestratorConstants.INIT_FILE_APPLICATION, AsyncOrchestratorPodProcess.NO_OP, - OrchestratorConstants.INIT_FILE_ENV_MAP, Jsons.serialize(envMap)), portMap); - - // a final activity waits until there is output from the kube pod process - asyncProcess.waitFor(10, TimeUnit.SECONDS); - - final var exitValue = asyncProcess.exitValue(); - final var output = asyncProcess.getOutput(); - - assertEquals(0, exitValue); - assertTrue(output.isPresent()); - assertEquals("expected output", output.get()); - } + @ValueSource(strings = {"IfNotPresent", " Always"}) + @ParameterizedTest + public void testAsyncOrchestratorPodProcess(final String pullPolicy) throws InterruptedException { + + final var podName = "test-async-" + RandomStringUtils.randomAlphabetic(10).toLowerCase(); + final var mainContainerInfo = new KubeContainerInfo("airbyte/container-orchestrator:dev", pullPolicy); + // make kubepodinfo + final var kubePodInfo = new KubePodInfo("default", podName, mainContainerInfo); + + // another activity issues the request to create the pod process -> here we'll just create it + final var asyncProcess = new AsyncOrchestratorPodProcess( + kubePodInfo, + documentStoreClient, + kubernetesClient, + null, + null, + null, + true); + + final Map portMap = Map.of( + WorkerApp.KUBE_HEARTBEAT_PORT, WorkerApp.KUBE_HEARTBEAT_PORT, + OrchestratorConstants.PORT1, OrchestratorConstants.PORT1, + OrchestratorConstants.PORT2, OrchestratorConstants.PORT2, + OrchestratorConstants.PORT3, OrchestratorConstants.PORT3, + OrchestratorConstants.PORT4, OrchestratorConstants.PORT4); + + final Map envMap = System.getenv().entrySet().stream() + .filter(entry -> OrchestratorConstants.ENV_VARS_TO_TRANSFER.contains(entry.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + asyncProcess.create(Map.of(), new WorkerConfigs(new EnvConfigs()).getResourceRequirements(), Map.of( + OrchestratorConstants.INIT_FILE_APPLICATION, AsyncOrchestratorPodProcess.NO_OP, + OrchestratorConstants.INIT_FILE_ENV_MAP, Jsons.serialize(envMap)), portMap); + + // a final activity waits until there is output from the kube pod process + asyncProcess.waitFor(10, TimeUnit.SECONDS); + + final var exitValue = asyncProcess.exitValue(); + final var output = asyncProcess.getOutput(); + + assertEquals(0, exitValue); + assertTrue(output.isPresent()); + assertEquals("expected output", output.get()); + } @AfterAll From e37ebe184d143d41f2900f49d2961044e0fbccd4 Mon Sep 17 00:00:00 2001 From: Guy Feldman Date: Sat, 13 Aug 2022 10:58:46 -0400 Subject: [PATCH 6/8] formatting --- .../container_orchestrator/DefaultAsyncStateManagerTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte-container-orchestrator/src/test/java/io/airbyte/container_orchestrator/DefaultAsyncStateManagerTest.java b/airbyte-container-orchestrator/src/test/java/io/airbyte/container_orchestrator/DefaultAsyncStateManagerTest.java index 53ae95618b3b..8d70cd30180a 100644 --- a/airbyte-container-orchestrator/src/test/java/io/airbyte/container_orchestrator/DefaultAsyncStateManagerTest.java +++ b/airbyte-container-orchestrator/src/test/java/io/airbyte/container_orchestrator/DefaultAsyncStateManagerTest.java @@ -19,7 +19,6 @@ class DefaultAsyncStateManagerTest { - private static final KubePodInfo KUBE_POD_INFO = new KubePodInfo("default", "pod1", new KubeContainerInfo("busybox", "IfNotPresent")); private static final String OUTPUT = "some output value"; From 8067114a55acea8953138a4ede88d431a4efc900 Mon Sep 17 00:00:00 2001 From: Guy Feldman Date: Sat, 13 Aug 2022 11:10:26 -0400 Subject: [PATCH 7/8] make image fake image on state manager test --- .../container_orchestrator/DefaultAsyncStateManagerTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-container-orchestrator/src/test/java/io/airbyte/container_orchestrator/DefaultAsyncStateManagerTest.java b/airbyte-container-orchestrator/src/test/java/io/airbyte/container_orchestrator/DefaultAsyncStateManagerTest.java index 8d70cd30180a..76b2b85b929c 100644 --- a/airbyte-container-orchestrator/src/test/java/io/airbyte/container_orchestrator/DefaultAsyncStateManagerTest.java +++ b/airbyte-container-orchestrator/src/test/java/io/airbyte/container_orchestrator/DefaultAsyncStateManagerTest.java @@ -19,7 +19,8 @@ class DefaultAsyncStateManagerTest { - private static final KubePodInfo KUBE_POD_INFO = new KubePodInfo("default", "pod1", new KubeContainerInfo("busybox", "IfNotPresent")); + public static final String FAKE_IMAGE = "fake_image"; + private static final KubePodInfo KUBE_POD_INFO = new KubePodInfo("default", "pod1", new KubeContainerInfo(FAKE_IMAGE, "IfNotPresent")); private static final String OUTPUT = "some output value"; private DocumentStoreClient documentStore; From 6a42f44fe34c4118f6729fa0bf87d1716162d138 Mon Sep 17 00:00:00 2001 From: Guy Feldman Date: Thu, 18 Aug 2022 03:10:39 -0400 Subject: [PATCH 8/8] logging a warning instead of a hard exception --- .../main/java/io/airbyte/workers/process/KubePodProcess.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java index 02e7773be0c1..80c6f5c17c85 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubePodProcess.java @@ -682,11 +682,11 @@ private Container getMainContainerFromPodDefinition() { .filter(c -> c.getName().contentEquals(MAIN_CONTAINER_NAME)) .findFirst(); if (containerOptional.isEmpty()) { - throw new RuntimeException(String.format("Could not find main container definition for pod: %s", podDefinition.getMetadata().getName())); + LOGGER.warn(String.format("Could not find main container definition for pod: %s", podDefinition.getMetadata().getName())); + return null; } else { return containerOptional.get(); } - } @Override