From 54c0ef12b1acc59348480e86a4f080206a833ca7 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Wed, 28 Dec 2022 13:41:47 -0800 Subject: [PATCH] Add orchestrator label. (#20904) Add the orchestrator label to orchestrators so we can better differentiate orchestrator pods. This is useful since orchestrator pods are the only pods in the job namespace with a need to talk to the main Airbyte application pods. These labels allow us to apply more granular network filtering. Also took the chance to do some clean up of labels. --- .../general/DbtTransformationRunner.java | 11 +++-- .../DefaultNormalizationRunner.java | 11 +++-- .../process/AirbyteIntegrationLauncher.java | 39 ++++++----------- .../workers/process/KubeProcessFactory.java | 15 ++++--- .../io/airbyte/workers/process/Metadata.java | 42 +++++++++++++++++++ .../airbyte/workers/sync/LauncherWorker.java | 6 ++- .../DefaultNormalizationRunnerTest.java | 10 +++-- .../AirbyteIntegrationLauncherTest.java | 26 ++++++------ .../workers/process/ProcessFactoryTest.java | 12 +++--- .../ReplicationJobOrchestrator.java | 2 +- 10 files changed, 107 insertions(+), 67 deletions(-) create mode 100644 airbyte-commons-worker/src/main/java/io/airbyte/workers/process/Metadata.java diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DbtTransformationRunner.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DbtTransformationRunner.java index 62316d0df602..7abd7bc7b672 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DbtTransformationRunner.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DbtTransformationRunner.java @@ -4,6 +4,11 @@ package io.airbyte.workers.general; +import static io.airbyte.workers.process.Metadata.CUSTOM_STEP; +import static io.airbyte.workers.process.Metadata.JOB_TYPE_KEY; +import static io.airbyte.workers.process.Metadata.SYNC_JOB; +import static io.airbyte.workers.process.Metadata.SYNC_STEP_KEY; + import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; @@ -17,7 +22,6 @@ import io.airbyte.workers.WorkerUtils; import io.airbyte.workers.exception.WorkerException; import io.airbyte.workers.normalization.NormalizationRunner; -import io.airbyte.workers.process.AirbyteIntegrationLauncher; import io.airbyte.workers.process.ProcessFactory; import java.nio.file.Path; import java.util.ArrayList; @@ -93,7 +97,7 @@ public boolean transform(final String jobId, Collections.addAll(dbtArguments, Commandline.translateCommandline(dbtConfig.getDbtArguments())); process = processFactory.create( - AirbyteIntegrationLauncher.CUSTOM_STEP, + CUSTOM_STEP, jobId, attempt, jobRoot, @@ -103,8 +107,7 @@ public boolean transform(final String jobId, files, "/bin/bash", resourceRequirements, - Map.of(AirbyteIntegrationLauncher.JOB_TYPE, AirbyteIntegrationLauncher.SYNC_JOB, AirbyteIntegrationLauncher.SYNC_STEP, - AirbyteIntegrationLauncher.CUSTOM_STEP), + Map.of(JOB_TYPE_KEY, SYNC_JOB, SYNC_STEP_KEY, CUSTOM_STEP), Collections.emptyMap(), Collections.emptyMap(), dbtArguments.toArray(new String[0])); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java index e96f9c81d403..8c2813804069 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java @@ -4,6 +4,11 @@ package io.airbyte.workers.normalization; +import static io.airbyte.workers.process.Metadata.JOB_TYPE_KEY; +import static io.airbyte.workers.process.Metadata.NORMALIZE_STEP; +import static io.airbyte.workers.process.Metadata.SYNC_JOB; +import static io.airbyte.workers.process.Metadata.SYNC_STEP_KEY; + import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; @@ -25,7 +30,6 @@ import io.airbyte.workers.WorkerConstants; import io.airbyte.workers.WorkerUtils; import io.airbyte.workers.exception.WorkerException; -import io.airbyte.workers.process.AirbyteIntegrationLauncher; import io.airbyte.workers.process.ProcessFactory; import java.io.InputStream; import java.nio.file.Path; @@ -120,7 +124,7 @@ private boolean runProcess(final String jobId, try { LOGGER.info("Running with normalization version: {}", normalizationImageName); process = processFactory.create( - AirbyteIntegrationLauncher.NORMALIZE_STEP, + NORMALIZE_STEP, jobId, attempt, jobRoot, @@ -130,8 +134,7 @@ private boolean runProcess(final String jobId, false, files, null, resourceRequirements, - Map.of(AirbyteIntegrationLauncher.JOB_TYPE, AirbyteIntegrationLauncher.SYNC_JOB, AirbyteIntegrationLauncher.SYNC_STEP, - AirbyteIntegrationLauncher.NORMALIZE_STEP), + Map.of(JOB_TYPE_KEY, SYNC_JOB, SYNC_STEP_KEY, NORMALIZE_STEP), Collections.emptyMap(), Collections.emptyMap(), args); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java index 3d6871786731..b28db1742ec5 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/AirbyteIntegrationLauncher.java @@ -8,6 +8,14 @@ import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ROOT_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME; +import static io.airbyte.workers.process.Metadata.CHECK_JOB; +import static io.airbyte.workers.process.Metadata.DISCOVER_JOB; +import static io.airbyte.workers.process.Metadata.JOB_TYPE_KEY; +import static io.airbyte.workers.process.Metadata.READ_STEP; +import static io.airbyte.workers.process.Metadata.SPEC_JOB; +import static io.airbyte.workers.process.Metadata.SYNC_JOB; +import static io.airbyte.workers.process.Metadata.SYNC_STEP_KEY; +import static io.airbyte.workers.process.Metadata.WRITE_STEP; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; @@ -27,29 +35,8 @@ public class AirbyteIntegrationLauncher implements IntegrationLauncher { - /** - * The following variables help, either via names or labels, add metadata to processes actually - * running operations. These are more readable forms of - * {@link io.airbyte.config.JobTypeResourceLimit.JobType}. - */ - public static final String JOB_TYPE = "job_type"; - public static final String SYNC_JOB = "sync"; - public static final String SPEC_JOB = "spec"; - public static final String CHECK_JOB = "check"; - public static final String DISCOVER_JOB = "discover"; - private static final String CONFIG = "--config"; - /** - * A sync job can actually be broken down into the following steps. Try to be as precise as possible - * with naming/labels to help operations. - */ - public static final String SYNC_STEP = "sync_step"; - public static final String READ_STEP = "read"; - public static final String WRITE_STEP = "write"; - public static final String NORMALIZE_STEP = "normalize"; - public static final String CUSTOM_STEP = "custom"; - private final String jobId; private final int attempt; private final String imageName; @@ -94,7 +81,7 @@ public Process spec(final Path jobRoot) throws WorkerException { Collections.emptyMap(), null, resourceRequirement, - Map.of(JOB_TYPE, SPEC_JOB), + Map.of(JOB_TYPE_KEY, SPEC_JOB), getWorkerMetadata(), Collections.emptyMap(), "spec"); @@ -115,7 +102,7 @@ public Process check(final Path jobRoot, final String configFilename, final Stri ImmutableMap.of(configFilename, configContents), null, resourceRequirement, - Map.of(JOB_TYPE, CHECK_JOB), + Map.of(JOB_TYPE_KEY, CHECK_JOB), getWorkerMetadata(), Collections.emptyMap(), "check", @@ -137,7 +124,7 @@ public Process discover(final Path jobRoot, final String configFilename, final S ImmutableMap.of(configFilename, configContents), null, resourceRequirement, - Map.of(JOB_TYPE, DISCOVER_JOB), + Map.of(JOB_TYPE_KEY, DISCOVER_JOB), getWorkerMetadata(), Collections.emptyMap(), "discover", @@ -183,7 +170,7 @@ public Process read(final Path jobRoot, files, null, resourceRequirement, - Map.of(JOB_TYPE, SYNC_JOB, SYNC_STEP, READ_STEP), + Map.of(JOB_TYPE_KEY, SYNC_JOB, SYNC_STEP_KEY, READ_STEP), getWorkerMetadata(), Collections.emptyMap(), arguments.toArray(new String[arguments.size()])); @@ -213,7 +200,7 @@ public Process write(final Path jobRoot, files, null, resourceRequirement, - Map.of(JOB_TYPE, SYNC_JOB, SYNC_STEP, WRITE_STEP), + Map.of(JOB_TYPE_KEY, SYNC_JOB, SYNC_STEP_KEY, WRITE_STEP), getWorkerMetadata(), Collections.emptyMap(), "write", diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java index 4f3d83f9890d..8643ba99219e 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java @@ -25,11 +25,6 @@ public class KubeProcessFactory implements ProcessFactory { private static final Logger LOGGER = LoggerFactory.getLogger(KubeProcessFactory.class); - private static final String JOB_LABEL_KEY = "job_id"; - private static final String ATTEMPT_LABEL_KEY = "attempt_id"; - private static final String WORKER_POD_LABEL_KEY = "airbyte"; - private static final String WORKER_POD_LABEL_VALUE = "worker-pod"; - private final WorkerConfigs workerConfigs; private final String namespace; private final KubernetesClient fabricClient; @@ -146,13 +141,17 @@ public Process create( } } + /** + * Returns general labels to be applied to all Kubernetes pods. All general labels should be added + * here. + */ public static Map getLabels(final String jobId, final int attemptId, final Map customLabels) { final var allLabels = new HashMap<>(customLabels); final var generalKubeLabels = Map.of( - JOB_LABEL_KEY, jobId, - ATTEMPT_LABEL_KEY, String.valueOf(attemptId), - WORKER_POD_LABEL_KEY, WORKER_POD_LABEL_VALUE); + Metadata.JOB_LABEL_KEY, jobId, + Metadata.ATTEMPT_LABEL_KEY, String.valueOf(attemptId), + Metadata.WORKER_POD_LABEL_KEY, Metadata.WORKER_POD_LABEL_VALUE); allLabels.putAll(generalKubeLabels); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/Metadata.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/Metadata.java new file mode 100644 index 000000000000..1b45a61196a8 --- /dev/null +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/Metadata.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.process; + +/** + * The following variables help, either via names or labels, add metadata to processes actually + * running operations to ease operations. + */ +public final class Metadata { + + /** + * General Metadata + */ + static final String JOB_LABEL_KEY = "job_id"; + static final String ATTEMPT_LABEL_KEY = "attempt_id"; + static final String WORKER_POD_LABEL_KEY = "airbyte"; + static final String WORKER_POD_LABEL_VALUE = "job-pod"; + public static final String CONNECTION_ID_LABEL_KEY = "connection_id"; + + /** + * These are more readable forms of {@link io.airbyte.config.JobTypeResourceLimit.JobType}. + */ + public static final String JOB_TYPE_KEY = "job_type"; + public static final String SYNC_JOB = "sync"; + public static final String SPEC_JOB = "spec"; + public static final String CHECK_JOB = "check"; + public static final String DISCOVER_JOB = "discover"; + + /** + * A sync job can actually be broken down into the following steps. Try to be as precise as possible + * with naming/labels to help operations. + */ + public static final String SYNC_STEP_KEY = "sync_step"; + public static final String READ_STEP = "read"; + public static final String WRITE_STEP = "write"; + public static final String NORMALIZE_STEP = "normalize"; + public static final String CUSTOM_STEP = "custom"; + public static final String ORCHESTRATOR_STEP = "orchestrator"; + +} diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/sync/LauncherWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/sync/LauncherWorker.java index ed2274be290e..0e69a3b96713 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/sync/LauncherWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/sync/LauncherWorker.java @@ -9,6 +9,9 @@ import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ROOT_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.PROCESS_EXIT_VALUE_KEY; import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME; +import static io.airbyte.workers.process.Metadata.CONNECTION_ID_LABEL_KEY; +import static io.airbyte.workers.process.Metadata.ORCHESTRATOR_STEP; +import static io.airbyte.workers.process.Metadata.SYNC_STEP_KEY; import com.google.common.base.Stopwatch; import datadog.trace.api.Trace; @@ -57,7 +60,6 @@ @Slf4j public class LauncherWorker implements Worker { - private static final String CONNECTION_ID_LABEL_KEY = "connection_id"; private static final Duration MAX_DELETION_TIMEOUT = Duration.ofSeconds(45); private final UUID connectionId; @@ -138,7 +140,7 @@ public OUTPUT run(final INPUT input, final Path jobRoot) throws WorkerException final var allLabels = KubeProcessFactory.getLabels( jobRunConfig.getJobId(), Math.toIntExact(jobRunConfig.getAttemptId()), - Map.of(CONNECTION_ID_LABEL_KEY, connectionId.toString())); + Map.of(CONNECTION_ID_LABEL_KEY, connectionId.toString(), SYNC_STEP_KEY, ORCHESTRATOR_STEP)); final var podNameAndJobPrefix = podNamePrefix + "-job-" + jobRunConfig.getJobId() + "-attempt-"; final var podName = podNameAndJobPrefix + jobRunConfig.getAttemptId(); diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/normalization/DefaultNormalizationRunnerTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/normalization/DefaultNormalizationRunnerTest.java index 0c3eb0633c06..db3bbc57671e 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/normalization/DefaultNormalizationRunnerTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/normalization/DefaultNormalizationRunnerTest.java @@ -5,6 +5,10 @@ package io.airbyte.workers.normalization; import static io.airbyte.commons.logging.LoggingHelper.RESET; +import static io.airbyte.workers.process.Metadata.JOB_TYPE_KEY; +import static io.airbyte.workers.process.Metadata.NORMALIZE_STEP; +import static io.airbyte.workers.process.Metadata.SYNC_JOB; +import static io.airbyte.workers.process.Metadata.SYNC_STEP_KEY; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -27,7 +31,6 @@ import io.airbyte.workers.WorkerConfigs; import io.airbyte.workers.WorkerConstants; import io.airbyte.workers.exception.WorkerException; -import io.airbyte.workers.process.AirbyteIntegrationLauncher; import io.airbyte.workers.process.ProcessFactory; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -85,11 +88,10 @@ void setup() throws IOException, WorkerException { WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, Jsons.serialize(config), WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME, Jsons.serialize(catalog)); - when(processFactory.create(AirbyteIntegrationLauncher.NORMALIZE_STEP, JOB_ID, JOB_ATTEMPT, jobRoot, + when(processFactory.create(NORMALIZE_STEP, JOB_ID, JOB_ATTEMPT, jobRoot, DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), false, false, files, null, workerConfigs.getResourceRequirements(), - Map.of(AirbyteIntegrationLauncher.JOB_TYPE, AirbyteIntegrationLauncher.SYNC_JOB, AirbyteIntegrationLauncher.SYNC_STEP, - AirbyteIntegrationLauncher.NORMALIZE_STEP), + Map.of(JOB_TYPE_KEY, SYNC_JOB, SYNC_STEP_KEY, NORMALIZE_STEP), Map.of(), Map.of(), "run", diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java index 30c8779a2449..06d0fd0d5935 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java @@ -4,14 +4,14 @@ package io.airbyte.workers.process; -import static io.airbyte.workers.process.AirbyteIntegrationLauncher.CHECK_JOB; -import static io.airbyte.workers.process.AirbyteIntegrationLauncher.DISCOVER_JOB; -import static io.airbyte.workers.process.AirbyteIntegrationLauncher.JOB_TYPE; -import static io.airbyte.workers.process.AirbyteIntegrationLauncher.READ_STEP; -import static io.airbyte.workers.process.AirbyteIntegrationLauncher.SPEC_JOB; -import static io.airbyte.workers.process.AirbyteIntegrationLauncher.SYNC_JOB; -import static io.airbyte.workers.process.AirbyteIntegrationLauncher.SYNC_STEP; -import static io.airbyte.workers.process.AirbyteIntegrationLauncher.WRITE_STEP; +import static io.airbyte.workers.process.Metadata.CHECK_JOB; +import static io.airbyte.workers.process.Metadata.DISCOVER_JOB; +import static io.airbyte.workers.process.Metadata.JOB_TYPE_KEY; +import static io.airbyte.workers.process.Metadata.READ_STEP; +import static io.airbyte.workers.process.Metadata.SPEC_JOB; +import static io.airbyte.workers.process.Metadata.SYNC_JOB; +import static io.airbyte.workers.process.Metadata.SYNC_STEP_KEY; +import static io.airbyte.workers.process.Metadata.WRITE_STEP; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -72,7 +72,7 @@ void spec() throws WorkerException { launcher.spec(JOB_ROOT); Mockito.verify(processFactory).create(SPEC_JOB, JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, false, Collections.emptyMap(), null, - workerConfigs.getResourceRequirements(), Map.of(AirbyteIntegrationLauncher.JOB_TYPE, AirbyteIntegrationLauncher.SPEC_JOB), JOB_METADATA, + workerConfigs.getResourceRequirements(), Map.of(JOB_TYPE_KEY, SPEC_JOB), JOB_METADATA, Map.of(), "spec"); } @@ -83,7 +83,7 @@ void check() throws WorkerException { Mockito.verify(processFactory).create(CHECK_JOB, JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, false, CONFIG_FILES, null, workerConfigs.getResourceRequirements(), - Map.of(JOB_TYPE, CHECK_JOB), + Map.of(JOB_TYPE_KEY, CHECK_JOB), JOB_METADATA, Map.of(), "check", @@ -96,7 +96,7 @@ void discover() throws WorkerException { Mockito.verify(processFactory).create(DISCOVER_JOB, JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, false, CONFIG_FILES, null, workerConfigs.getResourceRequirements(), - Map.of(JOB_TYPE, DISCOVER_JOB), + Map.of(JOB_TYPE_KEY, DISCOVER_JOB), JOB_METADATA, Map.of(), "discover", @@ -109,7 +109,7 @@ void read() throws WorkerException { Mockito.verify(processFactory).create(READ_STEP, JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, false, CONFIG_CATALOG_STATE_FILES, null, workerConfigs.getResourceRequirements(), - Map.of(JOB_TYPE, SYNC_JOB, SYNC_STEP, READ_STEP), + Map.of(JOB_TYPE_KEY, SYNC_JOB, SYNC_STEP_KEY, READ_STEP), JOB_METADATA, Map.of(), Lists.newArrayList( @@ -125,7 +125,7 @@ void write() throws WorkerException { Mockito.verify(processFactory).create(WRITE_STEP, JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, true, CONFIG_CATALOG_FILES, null, workerConfigs.getResourceRequirements(), - Map.of(JOB_TYPE, SYNC_JOB, SYNC_STEP, WRITE_STEP), + Map.of(JOB_TYPE_KEY, SYNC_JOB, SYNC_STEP_KEY, WRITE_STEP), JOB_METADATA, Map.of(), "write", diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/ProcessFactoryTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/ProcessFactoryTest.java index b8924253bee6..504285932a7b 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/ProcessFactoryTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/process/ProcessFactoryTest.java @@ -4,6 +4,8 @@ package io.airbyte.workers.process; +import static io.airbyte.workers.process.Metadata.SYNC_JOB; + import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -11,7 +13,7 @@ class ProcessFactoryTest { @Test void getPodNameNormal() { - final var name = ProcessFactory.createProcessName("airbyte/tester:1", AirbyteIntegrationLauncher.SYNC_JOB, "1", 10, + final var name = ProcessFactory.createProcessName("airbyte/tester:1", SYNC_JOB, "1", 10, KubeProcessFactory.KUBE_NAME_LEN_LIMIT); final var withoutRandSuffix = name.substring(0, name.length() - 5); Assertions.assertEquals("tester-sync-1-10-", withoutRandSuffix); @@ -21,7 +23,7 @@ void getPodNameNormal() { void getPodNameTruncated() { final var name = ProcessFactory.createProcessName("airbyte/very-very-very-long-name-longer-than-63-chars:2", - AirbyteIntegrationLauncher.SYNC_JOB, "1", 10, KubeProcessFactory.KUBE_NAME_LEN_LIMIT); + SYNC_JOB, "1", 10, KubeProcessFactory.KUBE_NAME_LEN_LIMIT); final var withoutRandSuffix = name.substring(0, name.length() - 5); Assertions.assertEquals("very-very-very-long-name-longer-than-63-chars-sync-1-10-", withoutRandSuffix); } @@ -29,7 +31,7 @@ void getPodNameTruncated() { @Test void testHandlingDashAsFirstCharacter() { final var uuid = "7339ba3b-cb53-4210-9591-c70d4a372330"; - final var name = ProcessFactory.createProcessName("airbyte/source-google-adwordsv2:latest", AirbyteIntegrationLauncher.SYNC_JOB, + final var name = ProcessFactory.createProcessName("airbyte/source-google-adwordsv2:latest", SYNC_JOB, uuid, 10, KubeProcessFactory.KUBE_NAME_LEN_LIMIT); final var withoutRandSuffix = name.substring(0, name.length() - 5); @@ -39,7 +41,7 @@ void testHandlingDashAsFirstCharacter() { @Test void testOnlyDashes() { final var uuid = "7339ba3b-cb53-4210-9591-c70d4a372330"; - final var name = ProcessFactory.createProcessName("--------", AirbyteIntegrationLauncher.SYNC_JOB, uuid, + final var name = ProcessFactory.createProcessName("--------", SYNC_JOB, uuid, 10, KubeProcessFactory.KUBE_NAME_LEN_LIMIT); final var withoutRandSuffix = name.substring(0, name.length() - 5); @@ -49,7 +51,7 @@ void testOnlyDashes() { @Test void testOnlyNumeric() { final var uuid = "7339ba3b-cb53-4210-9591-c70d4a372330"; - final var name = ProcessFactory.createProcessName("0000000000", AirbyteIntegrationLauncher.SYNC_JOB, uuid, + final var name = ProcessFactory.createProcessName("0000000000", SYNC_JOB, uuid, 10, KubeProcessFactory.KUBE_NAME_LEN_LIMIT); final var withoutRandSuffix = name.substring(0, name.length() - 5); diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java index b4d73416e743..cd5ed8d7694a 100644 --- a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java +++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/orchestrator/ReplicationJobOrchestrator.java @@ -105,7 +105,7 @@ public Optional runJob() throws Exception { // At this moment, if either source or destination is from custom connector image, we will put all // jobs into isolated pool to run. - boolean useIsolatedPool = sourceLauncherConfig.getIsCustomConnector() || destinationLauncherConfig.getIsCustomConnector(); + final boolean useIsolatedPool = sourceLauncherConfig.getIsCustomConnector() || destinationLauncherConfig.getIsCustomConnector(); log.info("Setting up source launcher..."); final var sourceLauncher = new AirbyteIntegrationLauncher( sourceLauncherConfig.getJobId(),