Skip to content

Commit

Permalink
Revert "Revert "Fix connector resource defaults location (#7342)"" (#…
Browse files Browse the repository at this point in the history
…7399)
  • Loading branch information
gosusnp committed Jun 22, 2023
1 parent d8c83ba commit e4f1082
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.apache.commons.io.output.NullOutputStream;
import org.apache.commons.text.StringEscapeUtils;
Expand Down Expand Up @@ -103,6 +104,7 @@
// it is required for the connectors
@SuppressWarnings("PMD.AvoidPrintStackTrace")
// TODO(Davin): Better test for this. See https://github.com/airbytehq/airbyte/issues/3700.
@Slf4j
public class KubePodProcess implements KubePod {

private static final Configs configs = new EnvConfigs();
Expand Down Expand Up @@ -883,20 +885,26 @@ public Info info() {
*/
public static ResourceRequirementsBuilder getResourceRequirementsBuilder(final ResourceRequirements resourceRequirements) {
if (resourceRequirements != null) {
Quantity cpuLimit = null;
Quantity memoryLimit = null;
final Map<String, Quantity> limitMap = new HashMap<>();
if (!com.google.common.base.Strings.isNullOrEmpty(resourceRequirements.getCpuLimit())) {
cpuLimit = Quantity.parse(resourceRequirements.getCpuLimit());
limitMap.put("cpu", cpuLimit);
}
if (!com.google.common.base.Strings.isNullOrEmpty(resourceRequirements.getMemoryLimit())) {
memoryLimit = Quantity.parse(resourceRequirements.getMemoryLimit());
limitMap.put("memory", memoryLimit);
}
final Map<String, Quantity> requestMap = new HashMap<>();
// if null then use unbounded resource allocation
if (!com.google.common.base.Strings.isNullOrEmpty(resourceRequirements.getCpuRequest())) {
requestMap.put("cpu", Quantity.parse(resourceRequirements.getCpuRequest()));
final Quantity cpuRequest = Quantity.parse(resourceRequirements.getCpuRequest());
requestMap.put("cpu", min(cpuRequest, cpuLimit));
}
if (!com.google.common.base.Strings.isNullOrEmpty(resourceRequirements.getMemoryRequest())) {
requestMap.put("memory", Quantity.parse(resourceRequirements.getMemoryRequest()));
}
final Map<String, Quantity> limitMap = new HashMap<>();
if (!com.google.common.base.Strings.isNullOrEmpty(resourceRequirements.getCpuLimit())) {
limitMap.put("cpu", Quantity.parse(resourceRequirements.getCpuLimit()));
}
if (!com.google.common.base.Strings.isNullOrEmpty(resourceRequirements.getMemoryLimit())) {
limitMap.put("memory", Quantity.parse(resourceRequirements.getMemoryLimit()));
final Quantity memoryRequest = Quantity.parse(resourceRequirements.getMemoryRequest());
requestMap.put("memory", min(memoryRequest, memoryLimit));
}
return new ResourceRequirementsBuilder()
.withRequests(requestMap)
Expand All @@ -905,6 +913,21 @@ public static ResourceRequirementsBuilder getResourceRequirementsBuilder(final R
return new ResourceRequirementsBuilder();
}

private static Quantity min(final Quantity request, final Quantity limit) {
if (limit == null) {
return request;
}
if (request == null) {
return limit;
}
if (request.getNumericalAmount().compareTo(limit.getNumericalAmount()) <= 0) {
return request;
} else {
log.info("Invalid resource requirements detected, requested {} while limit is {}, falling back to requesting {}.", request, limit, limit);
return limit;
}
}

private static String prependPodInfo(final String message, final String podNamespace, final String podName) {
return String.format("(pod: %s / %s) - %s", podNamespace, podName, message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@
package io.airbyte.workers.process;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.command.BuildImageResultCallback;
import com.github.dockerjava.httpclient5.ApacheDockerHttpClient;
import com.github.dockerjava.transport.DockerHttpClient;
import io.airbyte.commons.string.Strings;
import io.airbyte.config.ResourceRequirements;
import io.fabric8.kubernetes.api.model.ContainerBuilder;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import java.io.File;
Expand All @@ -30,49 +33,98 @@
import org.testcontainers.shaded.com.github.dockerjava.core.DockerClientImpl;
import org.testcontainers.shaded.com.google.common.io.Resources;

// Disabled until we start minikube on the node.
@Disabled
class KubePodProcessTest {

private static final KubernetesClient K8s = new DefaultKubernetesClient();
private static final String CPU = "cpu";
private static final String MEMORY = "memory";

@Test
@DisplayName("Should build resource requirements.")
void testBuildResourceRequirements() {
final var reqs = KubePodProcess.getResourceRequirementsBuilder(new ResourceRequirements()
.withCpuRequest("1")
.withCpuLimit("2")
.withMemoryRequest("1000Mi")
.withMemoryLimit("1Gi"));
final var actualReqs = reqs.build();

assertEquals(new Quantity("1"), actualReqs.getRequests().get(CPU));
assertEquals(new Quantity("2"), actualReqs.getLimits().get(CPU));
assertEquals(new Quantity("1000Mi"), actualReqs.getRequests().get(MEMORY));
assertEquals(new Quantity("1Gi"), actualReqs.getLimits().get(MEMORY));
}

private static final String TEST_IMAGE_WITH_VAR_PATH = "Dockerfile.with_var";
private static final String TEST_IMAGE_WITH_VAR_NAME = "worker-test:with-var";
@Test
@DisplayName("Should build resource requirements with partial infos.")
void testBuildResourceRequirementsWithPartialInfo() {
final var reqs = KubePodProcess.getResourceRequirementsBuilder(new ResourceRequirements()
.withCpuRequest("5")
.withMemoryLimit("4Gi"));
final var actualReqs = reqs.build();

assertEquals(new Quantity("5"), actualReqs.getRequests().get(CPU));
assertNull(actualReqs.getLimits().get(CPU));
assertNull(actualReqs.getRequests().get(MEMORY));
assertEquals(new Quantity("4Gi"), actualReqs.getLimits().get(MEMORY));
}

private static final String TEST_IMAGE_NO_VAR_PATH = "Dockerfile.no_var";
private static final String TEST_IMAGE_NO_VAR_NAME = "worker-test:no-var";
@Test
@DisplayName("Should build resource requirements that don't have conflicts.")
void testBuildResourceRequirementsShouldEnsureRequestFitsWithinLimits() {
final var reqs = KubePodProcess.getResourceRequirementsBuilder(new ResourceRequirements()
.withCpuRequest("1")
.withCpuLimit("0.5")
.withMemoryRequest("1000Mi")
.withMemoryLimit("0.5Gi"));
final var actualReqs = reqs.build();

assertEquals(new Quantity("0.5"), actualReqs.getRequests().get(CPU));
assertEquals(new Quantity("0.5"), actualReqs.getLimits().get(CPU));
assertEquals(new Quantity("0.5Gi"), actualReqs.getRequests().get(MEMORY));
assertEquals(new Quantity("0.5Gi"), actualReqs.getLimits().get(MEMORY));
}

private class DockerUtils {
// Disabled until we start minikube on the node.
@Disabled
@Nested
class GetPodIp {

private static final DockerClientConfig CONFIG = DefaultDockerClientConfig.createDefaultConfigBuilder().build();
private static final DockerHttpClient HTTP_CLIENT = new ApacheDockerHttpClient.Builder()
.dockerHost(CONFIG.getDockerHost())
.sslConfig(CONFIG.getSSLConfig())
.maxConnections(100)
.build();
private static final DockerClient DOCKER_CLIENT = DockerClientImpl.getInstance(CONFIG, HTTP_CLIENT);
private static final KubernetesClient K8s = new DefaultKubernetesClient();

public static String buildImage(final String dockerFilePath, final String tag) {
return DOCKER_CLIENT.buildImageCmd()
.withDockerfile(new File(dockerFilePath))
.withTags(Set.of(tag))
.exec(new BuildImageResultCallback())
.awaitImageId();
}
private static final String TEST_IMAGE_WITH_VAR_PATH = "Dockerfile.with_var";
private static final String TEST_IMAGE_WITH_VAR_NAME = "worker-test:with-var";

}
private static final String TEST_IMAGE_NO_VAR_PATH = "Dockerfile.no_var";
private static final String TEST_IMAGE_NO_VAR_NAME = "worker-test:no-var";

@BeforeAll
static void setup() {
final var varDockerfile = Resources.getResource(TEST_IMAGE_WITH_VAR_PATH);
DockerUtils.buildImage(varDockerfile.getPath(), TEST_IMAGE_WITH_VAR_NAME);
private class DockerUtils {

final var noVarDockerfile = Resources.getResource(TEST_IMAGE_NO_VAR_PATH);
DockerUtils.buildImage(noVarDockerfile.getPath(), TEST_IMAGE_NO_VAR_NAME);
}
private static final DockerClientConfig CONFIG = DefaultDockerClientConfig.createDefaultConfigBuilder().build();
private static final DockerHttpClient HTTP_CLIENT = new ApacheDockerHttpClient.Builder()
.dockerHost(CONFIG.getDockerHost())
.sslConfig(CONFIG.getSSLConfig())
.maxConnections(100)
.build();
private static final DockerClient DOCKER_CLIENT = DockerClientImpl.getInstance(CONFIG, HTTP_CLIENT);

@Nested
class GetPodIp {
public static String buildImage(final String dockerFilePath, final String tag) {
return DOCKER_CLIENT.buildImageCmd()
.withDockerfile(new File(dockerFilePath))
.withTags(Set.of(tag))
.exec(new BuildImageResultCallback())
.awaitImageId();
}

}

@BeforeAll
static void setup() {
final var varDockerfile = Resources.getResource(TEST_IMAGE_WITH_VAR_PATH);
DockerUtils.buildImage(varDockerfile.getPath(), TEST_IMAGE_WITH_VAR_NAME);

final var noVarDockerfile = Resources.getResource(TEST_IMAGE_NO_VAR_PATH);
DockerUtils.buildImage(noVarDockerfile.getPath(), TEST_IMAGE_NO_VAR_NAME);
}

@Test
@DisplayName("Should error when the given pod does not exists.")
Expand Down
10 changes: 0 additions & 10 deletions airbyte-container-orchestrator/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,6 @@ airbyte:
cpu-request: ${REPLICATION_ORCHESTRATOR_CPU_REQUEST:}
memory-limit: ${REPLICATION_ORCHESTRATOR_MEMORY_LIMIT:}
memory-request: ${REPLICATION_ORCHESTRATOR_MEMORY_REQUEST:}
source:
cpu-limit: ${SOURCE_CONTAINER_CPU_LIMIT:}
cpu-request: ${SOURCE_CONTAINER_CPU_REQUEST:0.5}
memory-limit: ${SOURCE_CONTAINER_MEMORY_LIMIT:}
memory-request: ${SOURCE_CONTAINER_MEMORY_REQUEST:}
source-database:
cpu-limit: ${SOURCE_DATABASE_CONTAINER_CPU_LIMIT:}
cpu-request: ${SOURCE_DATABASE_CONTAINER_CPU_REQUEST:1}
memory-limit: ${SOURCE_DATABASE_CONTAINER_MEMORY_LIMIT:}
memory-request: ${SOURCE_DATABASE_CONTAINER_MEMORY_REQUEST:}
replication:
persistence-flush-period-sec: ${REPLICATION_FLUSH_PERIOD_SECONDS:60}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@

import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.ResourceRequirementsType;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.TestClient;
import io.airbyte.persistence.job.models.JobRunConfig;
Expand All @@ -32,7 +30,6 @@
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import java.util.Map;
import java.util.Optional;
import org.junit.jupiter.api.Test;

// tests may be running on a real k8s environment, override the environment to something else for
Expand Down Expand Up @@ -125,24 +122,4 @@ void jobOrchestrator() {
assertTrue(caught, "invalid application name should have thrown an exception");
}

@Test
void checkDatabaseSourceResourceRequirements() {
final ResourceRequirements resourceRequirements =
workerConfigsProvider.getResourceRequirements(ResourceRequirementsType.SOURCE, Optional.of("database"));

assertEquals("1", resourceRequirements.getCpuRequest());
// This is verifying that we are inheriting the value from default.
assertEquals("1", resourceRequirements.getCpuLimit());
}

@Test
void checkSourceResourceRequirements() {
final ResourceRequirements resourceRequirements =
workerConfigsProvider.getResourceRequirements(ResourceRequirementsType.SOURCE, Optional.of("any"));

assertEquals("0.5", resourceRequirements.getCpuRequest());
// This is verifying that we are inheriting the value from default.
assertEquals("1", resourceRequirements.getCpuLimit());
}

}
10 changes: 10 additions & 0 deletions airbyte-workers/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,16 @@ airbyte:
cpu-request: ${REPLICATION_ORCHESTRATOR_CPU_REQUEST:}
memory-limit: ${REPLICATION_ORCHESTRATOR_MEMORY_LIMIT:}
memory-request: ${REPLICATION_ORCHESTRATOR_MEMORY_REQUEST:}
source:
cpu-limit: ${SOURCE_CONTAINER_CPU_LIMIT:}
cpu-request: ${SOURCE_CONTAINER_CPU_REQUEST:0.5}
memory-limit: ${SOURCE_CONTAINER_MEMORY_LIMIT:}
memory-request: ${SOURCE_CONTAINER_MEMORY_REQUEST:}
source-database:
cpu-limit: ${SOURCE_DATABASE_CONTAINER_CPU_LIMIT:}
cpu-request: ${SOURCE_DATABASE_CONTAINER_CPU_REQUEST:1}
memory-limit: ${SOURCE_DATABASE_CONTAINER_MEMORY_LIMIT:}
memory-request: ${SOURCE_DATABASE_CONTAINER_MEMORY_REQUEST:}
spec:
annotations: ${SPEC_JOB_KUBE_ANNOTATIONS:}
node-selectors: ${SPEC_JOB_KUBE_NODE_SELECTORS:}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.ResourceRequirementsType;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.config.WorkerConfigsProvider.ResourceType;
import io.micronaut.context.annotation.Value;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import java.util.Optional;
import org.junit.jupiter.api.Test;

// We are overriding the default config with application-config-test.yaml for consistency of the
Expand Down Expand Up @@ -73,4 +76,24 @@ void checkWorkerConfigProvider() {
assertEquals("", specKubeConfig.getResourceRequirements().getMemoryRequest());
}

@Test
void checkDatabaseSourceResourceRequirements() {
final ResourceRequirements resourceRequirements =
workerConfigsProvider.getResourceRequirements(ResourceRequirementsType.SOURCE, Optional.of("database"));

assertEquals("1", resourceRequirements.getCpuRequest());
// This is verifying that we are inheriting the value from default.
assertEquals("default cpu limit", resourceRequirements.getCpuLimit());
}

@Test
void checkSourceResourceRequirements() {
final ResourceRequirements resourceRequirements =
workerConfigsProvider.getResourceRequirements(ResourceRequirementsType.SOURCE, Optional.of("any"));

assertEquals("0.5", resourceRequirements.getCpuRequest());
// This is verifying that we are inheriting the value from default.
assertEquals("default cpu limit", resourceRequirements.getCpuLimit());
}

}

0 comments on commit e4f1082

Please sign in to comment.