Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set the image pull policy on the container orchestration #15385

Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@

import io.airbyte.workers.general.DocumentStoreClient;
import io.airbyte.workers.process.AsyncKubePodStatus;
import io.airbyte.workers.process.KubeContainerInfo;
import io.airbyte.workers.process.KubePodInfo;
import java.util.Optional;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class DefaultAsyncStateManagerTest {

private static final KubePodInfo KUBE_POD_INFO = new KubePodInfo("default", "pod1");
public static final String FAKE_IMAGE = "fake_image";
private static final KubePodInfo KUBE_POD_INFO = new KubePodInfo("default", "pod1", new KubeContainerInfo(FAKE_IMAGE, "IfNotPresent"));
private static final String OUTPUT = "some output value";

private DocumentStoreClient documentStore;
Expand Down
20 changes: 12 additions & 8 deletions airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -351,14 +351,17 @@ private static WorkerOptions getWorkerOptions(final int max) {
.build();
}

public record ContainerOrchestratorConfig(
String namespace,
DocumentStoreClient documentStoreClient,
KubernetesClient kubernetesClient,
String secretName,
String secretMountPath,
String containerOrchestratorImage,
String googleApplicationCredentials) {}
public static record ContainerOrchestratorConfig(
String namespace,
DocumentStoreClient documentStoreClient,
KubernetesClient kubernetesClient,
String secretName,
String secretMountPath,
String containerOrchestratorImage,
String containerOrchestratorImagePullPolicy,
String googleApplicationCredentials) {

}

static Optional<ContainerOrchestratorConfig> getContainerOrchestratorConfig(final Configs configs) {
if (configs.getContainerOrchestratorEnabled()) {
Expand All @@ -375,6 +378,7 @@ static Optional<ContainerOrchestratorConfig> getContainerOrchestratorConfig(fina
configs.getContainerOrchestratorSecretName(),
configs.getContainerOrchestratorSecretMountPath(),
configs.getContainerOrchestratorImage(),
configs.getJobKubeMainContainerImagePullPolicy(),
configs.getGoogleApplicationCredentials()));
} else {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public class AsyncOrchestratorPodProcess implements KubePod {
private final KubernetesClient kubernetesClient;
private final String secretName;
private final String secretMountPath;
private final String containerOrchestratorImage;
private final String googleApplicationCredentials;
private final AtomicReference<Optional<Integer>> cachedExitValue;
private final boolean useStreamCapableState;
Expand All @@ -70,15 +69,13 @@ public AsyncOrchestratorPodProcess(
final KubernetesClient kubernetesClient,
final String secretName,
final String secretMountPath,
final String containerOrchestratorImage,
final String googleApplicationCredentials,
final boolean useStreamCapableState) {
this.kubePodInfo = kubePodInfo;
this.documentStoreClient = documentStoreClient;
this.kubernetesClient = kubernetesClient;
this.secretName = secretName;
this.secretMountPath = secretMountPath;
this.containerOrchestratorImage = containerOrchestratorImage;
this.googleApplicationCredentials = googleApplicationCredentials;
this.cachedExitValue = new AtomicReference<>(Optional.empty());
this.useStreamCapableState = useStreamCapableState;
Expand Down Expand Up @@ -294,7 +291,8 @@ public void create(final Map<String, String> allLabels,

final var mainContainer = new ContainerBuilder()
.withName(KubePodProcess.MAIN_CONTAINER_NAME)
.withImage(containerOrchestratorImage)
.withImage(kubePodInfo.mainContainerInfo().image())
.withImagePullPolicy(kubePodInfo.mainContainerInfo().pullPolicy())
.withResources(KubePodProcess.getResourceRequirementsBuilder(resourceRequirements).build())
.withEnv(envVars)
.withPorts(containerPorts)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.process;

public record KubeContainerInfo(String image, String pullPolicy) {}
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@

package io.airbyte.workers.process;

public record KubePodInfo(String namespace, String name) {}
public record KubePodInfo(String namespace, String name, KubeContainerInfo mainContainerInfo) {}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -674,9 +675,27 @@ public Info info() {
return new KubePodProcessInfo(podDefinition.getMetadata().getName());
}

private Container getMainContainerFromPodDefinition() {
final Optional<Container> containerOptional = podDefinition.getSpec()
.getContainers()
.stream()
.filter(c -> c.getName().contentEquals(MAIN_CONTAINER_NAME))
.findFirst();
if (containerOptional.isEmpty()) {
LOGGER.warn(String.format("Could not find main container definition for pod: %s", podDefinition.getMetadata().getName()));
return null;
} else {
return containerOptional.get();
}
}

@Override
public KubePodInfo getInfo() {
return new KubePodInfo(podDefinition.getMetadata().getNamespace(), podDefinition.getMetadata().getName());
final Container mainContainer = getMainContainerFromPodDefinition();
final KubeContainerInfo mainContainerInfo = new KubeContainerInfo(mainContainer.getImage(), mainContainer.getImagePullPolicy());
return new KubePodInfo(podDefinition.getMetadata().getNamespace(),
podDefinition.getMetadata().getName(),
mainContainerInfo);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.process.AsyncKubePodStatus;
import io.airbyte.workers.process.AsyncOrchestratorPodProcess;
import io.airbyte.workers.process.KubeContainerInfo;
import io.airbyte.workers.process.KubePodInfo;
import io.airbyte.workers.process.KubePodResourceHelper;
import io.airbyte.workers.process.KubeProcessFactory;
Expand Down Expand Up @@ -72,7 +73,6 @@ public LauncherWorker(final UUID connectionId,
final ResourceRequirements resourceRequirements,
final Class<OUTPUT> outputClass,
final Supplier<ActivityExecutionContext> activityContext) {

this.connectionId = connectionId;
this.application = application;
this.podNamePrefix = podNamePrefix;
Expand Down Expand Up @@ -115,8 +115,11 @@ public OUTPUT run(final INPUT input, final Path jobRoot) throws WorkerException

final var podNameAndJobPrefix = podNamePrefix + "-job-" + jobRunConfig.getJobId() + "-attempt-";
final var podName = podNameAndJobPrefix + jobRunConfig.getAttemptId();

final var kubePodInfo = new KubePodInfo(containerOrchestratorConfig.namespace(), podName);
final var mainContainerInfo = new KubeContainerInfo(containerOrchestratorConfig.containerOrchestratorImage(),
containerOrchestratorConfig.containerOrchestratorImagePullPolicy());
final var kubePodInfo = new KubePodInfo(containerOrchestratorConfig.namespace(),
podName,
mainContainerInfo);
val featureFlag = new EnvVariableFeatureFlags();

process = new AsyncOrchestratorPodProcess(
Expand All @@ -125,7 +128,6 @@ public OUTPUT run(final INPUT input, final Path jobRoot) throws WorkerException
containerOrchestratorConfig.kubernetesClient(),
containerOrchestratorConfig.secretName(),
containerOrchestratorConfig.secretMountPath(),
containerOrchestratorConfig.containerOrchestratorImage(),
containerOrchestratorConfig.googleApplicationCredentials(),
featureFlag.useStreamCapableState());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;

@Disabled
Expand Down Expand Up @@ -95,12 +96,14 @@ public static void init() throws Exception {
Path.of("/"));
}

@Test
public void test() throws InterruptedException {
final var podName = "test-async-" + RandomStringUtils.randomAlphabetic(10).toLowerCase();
@ValueSource(strings = {"IfNotPresent", " Always"})
@ParameterizedTest
public void testAsyncOrchestratorPodProcess(final String pullPolicy) throws InterruptedException {

final var podName = "test-async-" + RandomStringUtils.randomAlphabetic(10).toLowerCase();
final var mainContainerInfo = new KubeContainerInfo("airbyte/container-orchestrator:dev", pullPolicy);
// make kubepodinfo
final var kubePodInfo = new KubePodInfo("default", podName);
final var kubePodInfo = new KubePodInfo("default", podName, mainContainerInfo);

// another activity issues the request to create the pod process -> here we'll just create it
final var asyncProcess = new AsyncOrchestratorPodProcess(
Expand All @@ -109,7 +112,6 @@ public void test() throws InterruptedException {
kubernetesClient,
null,
null,
"airbyte/container-orchestrator:dev",
null,
true);

Expand Down Expand Up @@ -137,6 +139,7 @@ public void test() throws InterruptedException {
assertEquals(0, exitValue);
assertTrue(output.isPresent());
assertEquals("expected output", output.get());

}

@AfterAll
Expand Down