Skip to content

Commit

Permalink
Specify namespace when creating pod (#19399)
Browse files Browse the repository at this point in the history
* Specify namespace when creating pof

* PR comments

* rm new line

* Fix micronaut injection
  • Loading branch information
benmoriceau authored Nov 15, 2022
1 parent 89b7740 commit a586537
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ public AsyncKubePodStatus getDocStoreStatus() {
public void create(final Map<String, String> allLabels,
final ResourceRequirements resourceRequirements,
final Map<String, String> fileMap,
final Map<Integer, Integer> portMap) {
final Map<Integer, Integer> portMap,
final Map<String, String> nodeSelectors) {
final List<Volume> volumes = new ArrayList<>();
final List<VolumeMount> volumeMounts = new ArrayList<>();
final List<EnvVar> envVars = new ArrayList<>();
Expand Down Expand Up @@ -352,6 +353,7 @@ public void create(final Map<String, String> allLabels,
.withContainers(mainContainer)
.withInitContainers(initContainer)
.withVolumes(volumes)
.withNodeSelector(nodeSelectors)
.endSpec()
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public DbtLauncherWorker(final UUID connectionId,
Void.class,
activityContext,
serverPort,
temporalUtils);
temporalUtils,
workerConfigs);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.workers.ContainerOrchestratorConfig;
import io.airbyte.workers.Worker;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.process.AsyncKubePodStatus;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class LauncherWorker<INPUT, OUTPUT> implements Worker<INPUT, OUTPUT> {
private final Supplier<ActivityExecutionContext> activityContext;
private final Integer serverPort;
private final TemporalUtils temporalUtils;
private final WorkerConfigs workerConfigs;

private final AtomicBoolean cancelled = new AtomicBoolean(false);
private AsyncOrchestratorPodProcess process;
Expand All @@ -84,7 +86,8 @@ public LauncherWorker(final UUID connectionId,
final Class<OUTPUT> outputClass,
final Supplier<ActivityExecutionContext> activityContext,
final Integer serverPort,
final TemporalUtils temporalUtils) {
final TemporalUtils temporalUtils,
final WorkerConfigs workerConfigs) {

this.connectionId = connectionId;
this.application = application;
Expand All @@ -97,6 +100,7 @@ public LauncherWorker(final UUID connectionId,
this.activityContext = activityContext;
this.serverPort = serverPort;
this.temporalUtils = temporalUtils;
this.workerConfigs = workerConfigs;
}

@Trace(operationName = WORKER_OPERATION_NAME)
Expand Down Expand Up @@ -174,7 +178,8 @@ public OUTPUT run(final INPUT input, final Path jobRoot) throws WorkerException
allLabels,
resourceRequirements,
fileMap,
portMap);
portMap,
workerConfigs.getworkerKubeNodeSelectors());
} catch (final KubernetesClientException e) {
ApmTraceUtils.addExceptionToTrace(e);
throw new WorkerException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public NormalizationLauncherWorker(final UUID connectionId,
NormalizationSummary.class,
activityContext,
serverPort,
temporalUtils);
temporalUtils,
workerConfigs);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.workers.ContainerOrchestratorConfig;
import io.airbyte.workers.WorkerConfigs;
import io.temporal.activity.ActivityExecutionContext;
import java.util.Map;
import java.util.UUID;
Expand All @@ -37,7 +38,8 @@ public ReplicationLauncherWorker(final UUID connectionId,
final ResourceRequirements resourceRequirements,
final Supplier<ActivityExecutionContext> activityContext,
final Integer serverPort,
final TemporalUtils temporalUtils) {
final TemporalUtils temporalUtils,
final WorkerConfigs workerConfigs) {
super(
connectionId,
REPLICATION,
Expand All @@ -51,7 +53,8 @@ public ReplicationLauncherWorker(final UUID connectionId,
ReplicationOutput.class,
activityContext,
serverPort,
temporalUtils);
temporalUtils,
workerConfigs);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.airbyte.workers.ContainerOrchestratorConfig;
import io.airbyte.workers.RecordSchemaValidator;
import io.airbyte.workers.Worker;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerMetricReporter;
import io.airbyte.workers.WorkerUtils;
Expand Down Expand Up @@ -97,6 +98,7 @@ public class ReplicationActivityImpl implements ReplicationActivity {
private final AirbyteApiClient airbyteApiClient;
private final AirbyteMessageSerDeProvider serDeProvider;
private final AirbyteMessageVersionedMigratorFactory migratorFactory;
private final WorkerConfigs workerConfigs;

public ReplicationActivityImpl(@Named("containerOrchestratorConfig") final Optional<ContainerOrchestratorConfig> containerOrchestratorConfig,
@Named("replicationProcessFactory") final ProcessFactory processFactory,
Expand All @@ -111,7 +113,8 @@ public ReplicationActivityImpl(@Named("containerOrchestratorConfig") final Optio
final TemporalUtils temporalUtils,
final AirbyteApiClient airbyteApiClient,
final AirbyteMessageSerDeProvider serDeProvider,
final AirbyteMessageVersionedMigratorFactory migratorFactory) {
final AirbyteMessageVersionedMigratorFactory migratorFactory,
@Named("replicationWorkerConfigs") final WorkerConfigs workerConfigs) {
this.containerOrchestratorConfig = containerOrchestratorConfig;
this.processFactory = processFactory;
this.secretsHydrator = secretsHydrator;
Expand All @@ -126,6 +129,7 @@ public ReplicationActivityImpl(@Named("containerOrchestratorConfig") final Optio
this.airbyteApiClient = airbyteApiClient;
this.serDeProvider = serDeProvider;
this.migratorFactory = migratorFactory;
this.workerConfigs = workerConfigs;
}

// Marking task queue as nullable because we changed activity signature; thus runs started before
Expand Down Expand Up @@ -166,7 +170,8 @@ public StandardSyncOutput replicate(final JobRunConfig jobRunConfig,
destinationLauncherConfig,
jobRunConfig,
syncInput.getResourceRequirements(),
() -> context);
() -> context,
workerConfigs);
} else {
workerFactory =
getLegacyWorkerFactory(sourceLauncherConfig, destinationLauncherConfig, jobRunConfig, syncInput);
Expand Down Expand Up @@ -295,7 +300,8 @@ private CheckedSupplier<Worker<StandardSyncInput, ReplicationOutput>, Exception>
final IntegrationLauncherConfig destinationLauncherConfig,
final JobRunConfig jobRunConfig,
final ResourceRequirements resourceRequirements,
final Supplier<ActivityExecutionContext> activityContext)
final Supplier<ActivityExecutionContext> activityContext,
final WorkerConfigs workerConfigs)
throws ApiException {
final JobIdRequestBody id = new JobIdRequestBody();
id.setId(Long.valueOf(jobRunConfig.getJobId()));
Expand All @@ -313,7 +319,8 @@ private CheckedSupplier<Worker<StandardSyncInput, ReplicationOutput>, Exception>
resourceRequirements,
activityContext,
serverPort,
temporalUtils);
temporalUtils,
workerConfigs);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,11 @@ public void testAsyncOrchestratorPodProcess(final String pullPolicy) throws Inte
.filter(entry -> OrchestratorConstants.ENV_VARS_TO_TRANSFER.contains(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

final WorkerConfigs workerConfigs = new WorkerConfigs(new EnvConfigs());

asyncProcess.create(Map.of(), new WorkerConfigs(new EnvConfigs()).getResourceRequirements(), Map.of(
OrchestratorConstants.INIT_FILE_APPLICATION, AsyncOrchestratorPodProcess.NO_OP,
OrchestratorConstants.INIT_FILE_ENV_MAP, Jsons.serialize(envMap)), portMap);
OrchestratorConstants.INIT_FILE_ENV_MAP, Jsons.serialize(envMap)), portMap, workerConfigs.getworkerKubeNodeSelectors());

// a final activity waits until there is output from the kube pod process
asyncProcess.waitFor(10, TimeUnit.SECONDS);
Expand Down

0 comments on commit a586537

Please sign in to comment.