Skip to content

Commit

Permalink
chore: wrangle file/container constants into central location. (#13625)
Browse files Browse the repository at this point in the history
  • Loading branch information
tryangul committed Aug 22, 2024
1 parent df199d2 commit be843ca
Show file tree
Hide file tree
Showing 28 changed files with 157 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ public class KubePodProcess implements KubePod {
private static final Logger LOGGER = LoggerFactory.getLogger(KubePodProcess.class);

public static final String MAIN_CONTAINER_NAME = "main";
public static final String SIDECAR_CONTAINER_NAME = "connector-sidecar";
public static final String INIT_CONTAINER_NAME = "init";

private static final String PIPES_DIR = "/pipes";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.airbyte.workers.ContainerOrchestratorConfig;
import io.airbyte.workers.Worker;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.pod.FileConstants;
import io.airbyte.workers.process.AsyncKubePodStatus;
import io.airbyte.workers.process.AsyncOrchestratorPodProcess;
import io.airbyte.workers.process.KubeContainerInfo;
Expand Down Expand Up @@ -147,7 +148,7 @@ public OUTPUT run(final INPUT input, final Path jobRoot) throws WorkerException

final Map<String, String> fileMap = new HashMap<>(additionalFileMap);
fileMap.putAll(Map.of(
OrchestratorConstants.INIT_FILE_INPUT, Jsons.serialize(input)));
FileConstants.INIT_INPUT_FILE, Jsons.serialize(input)));

final Map<Integer, Integer> portMap = Map.of(
serverPort, serverPort,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.airbyte.workers.pod

object ContainerConstants {
// shared containers
const val INIT_CONTAINER_NAME = "init"

// replication containers
const val ORCHESTRATOR_CONTAINER_NAME = "orchestrator"
const val SOURCE_CONTAINER_NAME = "source"
const val DESTINATION_CONTAINER_NAME = "destination"

// connector operation containers
const val MAIN_CONTAINER_NAME = "main"
const val SIDECAR_CONTAINER_NAME = "connector-sidecar"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.airbyte.workers.pod

object FileConstants {
// dirs
const val SOURCE_DIR = "/source"
const val DEST_DIR = "/dest"
const val CONFIG_DIR = "/config"

// pipes
const val STDIN_PIPE_FILE = "stdin"
const val STDOUT_PIPE_FILE = "stdout"
const val STDERR_PIPE_FILE = "stderr"

// output files
const val EXIT_CODE_FILE = "exitCode.txt"
const val JOB_OUTPUT_FILE = "jobOutput.json"

// input and configuration files
const val CONNECTION_CONFIGURATION_FILE = "connectionConfiguration.json"
const val INIT_INPUT_FILE = "input.json"
const val SIDECAR_INPUT_FILE = "sidecarInput.json"
const val SOURCE_CONFIG_FILE = "sourceConfig.json"
const val DESTINATION_CONFIG_FILE = "destinationConfig.json"
const val SOURCE_CATALOG_FILE = "sourceCatalog.json"
const val DESTINATION_CATALOG_FILE = "destinationCatalog.json"
const val INPUT_STATE_FILE = "inputState.json"

// marker files
const val TERMINATION_MARKER_FILE = "TERMINATED"
const val KUBE_CP_SUCCESS_MARKER_FILE = "FINISHED_UPLOADING"
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,6 @@ private const val LOG_LEVEL = "LOG_LEVEL"
private const val S3_PATH_STYLE_ACCESS = "S3_PATH_STYLE_ACCESS"

object OrchestratorConstants {
const val JOB_OUTPUT_FILENAME = "jobOutput.json"
const val CONNECTION_CONFIGURATION = "connectionConfiguration.json"
const val EXIT_CODE_FILE = "exitCode.txt"
const val INIT_FILE_INPUT = "input.json"
const val INIT_FILE_JOB_RUN_CONFIG = "jobRunConfig.json"
const val INIT_FILE_APPLICATION = "application.txt"
const val APPLICATION = "application"
const val SIDECAR_INPUT = "sidecarInput.json"

// See the application.yml of the container-orchestrator for value
const val SERVER_PORT = 9000

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import io.airbyte.workers.internal.AirbyteStreamFactory
import io.airbyte.workers.internal.VersionedAirbyteStreamFactory
import io.airbyte.workers.internal.VersionedAirbyteStreamFactory.InvalidLineFailureConfiguration
import io.airbyte.workers.models.SidecarInput
import io.airbyte.workers.sync.OrchestratorConstants
import io.airbyte.workers.pod.FileConstants
import io.airbyte.workers.workload.JobOutputDocStore
import io.airbyte.workload.api.client.WorkloadApiClient
import io.airbyte.workload.api.client.model.generated.WorkloadFailureRequest
Expand Down Expand Up @@ -53,7 +53,7 @@ class ConnectorWatcher(
private val logContextFactory: SidecarLogContextFactory,
) {
fun run() {
val input = Jsons.deserialize(readFile(OrchestratorConstants.SIDECAR_INPUT), SidecarInput::class.java)
val input = Jsons.deserialize(readFile(FileConstants.SIDECAR_INPUT_FILE), SidecarInput::class.java)
withLoggingContext(logContextFactory.create(input.logPath)) {
LineGobbler.startSection(input.operationType.toString())
val checkConnectionConfiguration = input.checkConnectionInput
Expand Down Expand Up @@ -86,13 +86,13 @@ class ConnectorWatcher(
}
logger.info { "Connector exited, processing output" }
val outputIS =
if (Files.exists(Path.of(OrchestratorConstants.JOB_OUTPUT_FILENAME))) {
logger.info { "Output file ${OrchestratorConstants.JOB_OUTPUT_FILENAME} found" }
Files.newInputStream(Path.of(OrchestratorConstants.JOB_OUTPUT_FILENAME))
if (Files.exists(Path.of(FileConstants.JOB_OUTPUT_FILE))) {
logger.info { "Output file ${FileConstants.JOB_OUTPUT_FILE} found" }
Files.newInputStream(Path.of(FileConstants.JOB_OUTPUT_FILE))
} else {
InputStream.nullInputStream()
}
val exitCode = readFile(OrchestratorConstants.EXIT_CODE_FILE).trim().toInt()
val exitCode = readFile(FileConstants.EXIT_CODE_FILE).trim().toInt()
logger.info { "Connector exited with $exitCode" }
val streamFactory: AirbyteStreamFactory = getStreamFactory(integrationLauncherConfig)
val connectorOutput: ConnectorJobOutput =
Expand Down Expand Up @@ -164,7 +164,7 @@ class ConnectorWatcher(

@VisibleForTesting
fun areNeededFilesPresent(): Boolean {
return Files.exists(outputPath) && Files.exists(Path.of(configDir, OrchestratorConstants.EXIT_CODE_FILE))
return Files.exists(outputPath) && Files.exists(Path.of(configDir, FileConstants.EXIT_CODE_FILE))
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.airbyte.connectorSidecar.config

import io.airbyte.workers.process.KubePodProcess
import io.airbyte.workers.sync.OrchestratorConstants
import io.airbyte.workers.pod.FileConstants
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Value
import jakarta.inject.Named
Expand All @@ -13,7 +12,7 @@ class ConfigFactory {
/**
* Returns the config directory which contains all the configuration files.
*
* @param configDir optional directory, defaults to KubePodProcess.CONFIG_DIR if not defined.
* @param configDir optional directory, defaults to FileConstants.CONFIG_DIR if not defined.
* @return Configuration directory.
*/
@Singleton
Expand All @@ -22,7 +21,7 @@ class ConfigFactory {
@Value("\${airbyte.config-dir}") configDir: String?,
): String {
if (configDir == null) {
return KubePodProcess.CONFIG_DIR
return FileConstants.CONFIG_DIR
}
return configDir
}
Expand All @@ -33,6 +32,6 @@ class ConfigFactory {
@Singleton
@Named("output")
fun output(): Path {
return Path.of(OrchestratorConstants.JOB_OUTPUT_FILENAME)
return Path.of(FileConstants.JOB_OUTPUT_FILE)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import io.airbyte.workers.helper.GsonPksExtractor
import io.airbyte.workers.internal.AirbyteStreamFactory
import io.airbyte.workers.models.SidecarInput
import io.airbyte.workers.models.SidecarInput.OperationType
import io.airbyte.workers.sync.OrchestratorConstants
import io.airbyte.workers.pod.FileConstants
import io.airbyte.workers.workload.JobOutputDocStore
import io.airbyte.workload.api.client.WorkloadApiClient
import io.airbyte.workload.api.client.generated.WorkloadApi
Expand Down Expand Up @@ -92,7 +92,7 @@ class ConnectorWatchTest {
),
)

every { connectorWatcher.readFile(OrchestratorConstants.EXIT_CODE_FILE) } returns "0"
every { connectorWatcher.readFile(FileConstants.EXIT_CODE_FILE) } returns "0"

every { connectorWatcher.areNeededFilesPresent() } returns true

Expand All @@ -114,7 +114,7 @@ class ConnectorWatchTest {
ConnectorJobOutput()
.withCheckConnection(StandardCheckConnectionOutput().withStatus(StandardCheckConnectionOutput.Status.SUCCEEDED))

every { connectorWatcher.readFile(OrchestratorConstants.SIDECAR_INPUT) } returns
every { connectorWatcher.readFile(FileConstants.SIDECAR_INPUT_FILE) } returns
Jsons.serialize(SidecarInput(checkInput, discoveryInput, workloadId, IntegrationLauncherConfig(), operationType, ""))

every { connectorMessageProcessor.run(any(), any(), any(), any(), eq(operationType)) } returns output
Expand All @@ -138,7 +138,7 @@ class ConnectorWatchTest {
ConnectorJobOutput()
.withCheckConnection(StandardCheckConnectionOutput().withStatus(StandardCheckConnectionOutput.Status.FAILED))

every { connectorWatcher.readFile(OrchestratorConstants.SIDECAR_INPUT) } returns
every { connectorWatcher.readFile(FileConstants.SIDECAR_INPUT_FILE) } returns
Jsons.serialize(SidecarInput(checkInput, discoveryInput, workloadId, IntegrationLauncherConfig(), operationType, ""))

every { connectorMessageProcessor.run(any(), any(), any(), any(), eq(operationType)) } returns output
Expand Down Expand Up @@ -166,7 +166,7 @@ class ConnectorWatchTest {
OperationType.SPEC -> connectorWatcher.getFailedOutput("", exception)
}

every { connectorWatcher.readFile(OrchestratorConstants.SIDECAR_INPUT) } returns
every { connectorWatcher.readFile(FileConstants.SIDECAR_INPUT_FILE) } returns
Jsons.serialize(SidecarInput(checkInput, discoveryInput, workloadId, IntegrationLauncherConfig().withDockerImage(""), operationType, ""))

every { connectorMessageProcessor.run(any(), any(), any(), any(), eq(operationType)) } throws exception
Expand Down Expand Up @@ -196,7 +196,7 @@ class ConnectorWatchTest {
@ParameterizedTest
@EnumSource(OperationType::class)
fun `run for failed with file timeout`(operationType: OperationType) {
every { connectorWatcher.readFile(OrchestratorConstants.SIDECAR_INPUT) } returns
every { connectorWatcher.readFile(FileConstants.SIDECAR_INPUT_FILE) } returns
Jsons.serialize(SidecarInput(checkInput, discoveryInput, workloadId, IntegrationLauncherConfig(), operationType, ""))

every { connectorWatcher.areNeededFilesPresent() } returns false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@

import io.airbyte.commons.json.Jsons;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.workers.pod.FileConstants;
import io.airbyte.workers.process.AsyncOrchestratorPodProcess;
import io.airbyte.workers.process.KubePodInfo;
import io.airbyte.workers.process.KubePodProcess;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Value;
import jakarta.annotation.Nullable;
Expand All @@ -26,14 +26,14 @@ public class ConfigFactory {
/**
* Returns the config directory which contains all the configuration files.
*
* @param configDir optional directory, defaults to KubePodProcess.CONFIG_DIR if not defined.
* @param configDir optional directory, defaults to FileConstants.CONFIG_DIR if not defined.
* @return Configuration directory.
*/
@Singleton
@Named("configDir")
String configDir(@Value("${airbyte.config-dir}") @Nullable final String configDir) {
if (configDir == null) {
return KubePodProcess.CONFIG_DIR;
return FileConstants.CONFIG_DIR;
}
return configDir;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@

import io.airbyte.commons.envvar.EnvVar;
import io.airbyte.commons.json.Jsons;
import io.airbyte.workers.process.KubePodProcess;
import io.airbyte.workers.sync.OrchestratorConstants;
import io.airbyte.workers.pod.FileConstants;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Optional;
Expand All @@ -23,7 +22,7 @@ public interface JobOrchestrator<INPUT> {
Class<INPUT> getInputClass();

default Path getConfigDir() {
return Path.of(KubePodProcess.CONFIG_DIR);
return Path.of(FileConstants.CONFIG_DIR);
}

/**
Expand All @@ -34,7 +33,7 @@ default Path getConfigDir() {
*/
default INPUT readInput() throws IOException {
return Jsons.deserialize(
getConfigDir().resolve(OrchestratorConstants.INIT_FILE_INPUT).toFile(),
getConfigDir().resolve(FileConstants.INIT_INPUT_FILE).toFile(),
getInputClass());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import io.airbyte.initContainer.system.FileClient
import io.airbyte.workers.CheckConnectionInputHydrator
import io.airbyte.workers.models.CheckConnectionInput
import io.airbyte.workers.models.SidecarInput
import io.airbyte.workers.pod.FileConstants
import io.airbyte.workers.serde.ObjectSerializer
import io.airbyte.workers.serde.PayloadDeserializer
import io.airbyte.workers.sync.OrchestratorConstants
import io.airbyte.workload.api.client.model.generated.Workload
import io.micronaut.context.annotation.Requires
import jakarta.inject.Singleton
Expand All @@ -26,12 +26,12 @@ class CheckHydrationProcessor(
val hydrated = inputHydrator.getHydratedStandardCheckInput(parsed.checkConnectionInput)

fileClient.writeInputFile(
OrchestratorConstants.CONNECTION_CONFIGURATION,
FileConstants.CONNECTION_CONFIGURATION_FILE,
serializer.serialize(hydrated.connectionConfiguration),
)

fileClient.writeInputFile(
OrchestratorConstants.SIDECAR_INPUT,
FileConstants.SIDECAR_INPUT_FILE,
serializer.serialize(
SidecarInput(
hydrated,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import io.airbyte.initContainer.system.FileClient
import io.airbyte.workers.DiscoverCatalogInputHydrator
import io.airbyte.workers.models.DiscoverCatalogInput
import io.airbyte.workers.models.SidecarInput
import io.airbyte.workers.pod.FileConstants
import io.airbyte.workers.serde.ObjectSerializer
import io.airbyte.workers.serde.PayloadDeserializer
import io.airbyte.workers.sync.OrchestratorConstants
import io.airbyte.workload.api.client.model.generated.Workload
import io.micronaut.context.annotation.Requires
import jakarta.inject.Singleton
Expand All @@ -26,12 +26,12 @@ class DiscoverHydrationProcessor(
val hydrated = inputHydrator.getHydratedStandardDiscoverInput(parsed.discoverCatalogInput)

fileClient.writeInputFile(
OrchestratorConstants.CONNECTION_CONFIGURATION,
FileConstants.CONNECTION_CONFIGURATION_FILE,
serializer.serialize(hydrated.connectionConfiguration),
)

fileClient.writeInputFile(
OrchestratorConstants.SIDECAR_INPUT,
FileConstants.SIDECAR_INPUT_FILE,
serializer.serialize(
SidecarInput(
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import io.airbyte.workers.ReplicationInputHydrator
import io.airbyte.workers.input.setDestinationLabels
import io.airbyte.workers.input.setSourceLabels
import io.airbyte.workers.models.ReplicationActivityInput
import io.airbyte.workers.pod.FileConstants
import io.airbyte.workers.pod.PodLabeler
import io.airbyte.workers.serde.ObjectSerializer
import io.airbyte.workers.serde.PayloadDeserializer
import io.airbyte.workers.sync.OrchestratorConstants
import io.airbyte.workload.api.client.model.generated.Workload
import io.micronaut.context.annotation.Requires
import jakarta.inject.Singleton
Expand Down Expand Up @@ -46,7 +46,7 @@ class ReplicationHydrationProcessor(
.setDestinationLabels(labels)

fileClient.writeInputFile(
OrchestratorConstants.INIT_FILE_INPUT,
FileConstants.INIT_INPUT_FILE,
serializer.serialize(inputWithLabels),
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package io.airbyte.initContainer.input
import io.airbyte.initContainer.system.FileClient
import io.airbyte.workers.models.SidecarInput
import io.airbyte.workers.models.SpecInput
import io.airbyte.workers.pod.FileConstants
import io.airbyte.workers.serde.ObjectSerializer
import io.airbyte.workers.serde.PayloadDeserializer
import io.airbyte.workers.sync.OrchestratorConstants
import io.airbyte.workload.api.client.model.generated.Workload
import io.micronaut.context.annotation.Requires
import jakarta.inject.Singleton
Expand All @@ -22,7 +22,7 @@ class SpecHydrationProcessor(
val parsed: SpecInput = deserializer.toSpecInput(rawPayload)

fileClient.writeInputFile(
OrchestratorConstants.SIDECAR_INPUT,
FileConstants.SIDECAR_INPUT_FILE,
serializer.serialize(
SidecarInput(
null,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.airbyte.initContainer.system

import io.airbyte.workers.process.KubePodProcess
import io.airbyte.workers.pod.FileConstants
import jakarta.inject.Singleton
import java.nio.charset.StandardCharsets
import java.nio.file.Files
Expand All @@ -19,7 +19,7 @@ class FileClient {
fileContents: String,
) {
Files.writeString(
Path.of(KubePodProcess.CONFIG_DIR).resolve(fileName),
Path.of(FileConstants.CONFIG_DIR).resolve(fileName),
fileContents,
StandardCharsets.UTF_8,
)
Expand Down
Loading

0 comments on commit be843ca

Please sign in to comment.