Skip to content

Commit

Permalink
Moving TrackingClientSingleton.initialize into the bean itself (#17631)
Browse files Browse the repository at this point in the history
* enable testing for micronaut

* move tracking client initializer into the bean

* pmd fix
  • Loading branch information
xiaohansong authored Oct 6, 2022
1 parent c1e0c77 commit 559a79e
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,18 @@

package io.airbyte.workers;

import io.airbyte.analytics.Deployment;
import io.airbyte.analytics.TrackingClientSingleton;
import io.airbyte.commons.temporal.TemporalInitializationUtils;
import io.airbyte.commons.temporal.TemporalJobType;
import io.airbyte.commons.temporal.TemporalUtils;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.Configs.DeploymentMode;
import io.airbyte.config.Configs.TrackingStrategy;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.MaxWorkersConfig;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.db.check.DatabaseCheckException;
import io.airbyte.db.check.DatabaseMigrationCheck;
import io.airbyte.db.check.impl.JobsDatabaseAvailabilityCheck;
import io.airbyte.metrics.lib.MetricClientFactory;
import io.airbyte.metrics.lib.MetricEmittingApps;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.workers.config.WorkerMode;
import io.airbyte.workers.process.KubePortManagerSingleton;
import io.airbyte.workers.temporal.check.connection.CheckConnectionWorkflowImpl;
Expand Down Expand Up @@ -68,24 +61,16 @@
@Slf4j
public class ApplicationInitializer implements ApplicationEventListener<ServiceReadyEvent> {

@Value("${airbyte.role}")
private String airbyteRole;
@Inject
private AirbyteVersion airbyteVersion;
@Inject
@Named("checkConnectionActivities")
private Optional<List<Object>> checkConnectionActivities;
@Inject
@Named("configsDatabaseMigrationCheck")
private Optional<DatabaseMigrationCheck> configsDatabaseMigrationCheck;
@Inject
private Optional<ConfigRepository> configRepository;
@Inject
@Named("connectionManagerActivities")
private Optional<List<Object>> connectionManagerActivities;
@Inject
private DeploymentMode deploymentMode;
@Inject
@Named("discoverActivities")
private Optional<List<Object>> discoverActivities;
@Inject
Expand All @@ -97,8 +82,7 @@ public class ApplicationInitializer implements ApplicationEventListener<ServiceR
@Inject
@Named("jobsDatabaseAvailabilityCheck")
private Optional<JobsDatabaseAvailabilityCheck> jobsDatabaseAvailabilityCheck;
@Inject
private Optional<JobPersistence> jobPersistence;

@Inject
private Optional<LogConfigs> logConfigs;
@Value("${airbyte.worker.check.max-workers}")
Expand Down Expand Up @@ -136,8 +120,6 @@ public class ApplicationInitializer implements ApplicationEventListener<ServiceR
@Value("${airbyte.temporal.worker.ports}")
private Set<Integer> temporalWorkerPorts;
@Inject
private Optional<TrackingStrategy> trackingStrategy;
@Inject
private WorkerEnvironment workerEnvironment;
@Inject
private WorkerFactory workerFactory;
Expand Down Expand Up @@ -200,14 +182,6 @@ private void initializeControlPlaneDependencies() throws DatabaseCheckException,
// Ensure that the Jobs database is available
log.info("Checking jobs database availability...");
jobsDatabaseAvailabilityCheck.orElseThrow().check();

TrackingClientSingleton.initialize(
trackingStrategy.orElseThrow(),
new Deployment(deploymentMode, jobPersistence.orElseThrow().getDeployment().orElseThrow(),
workerEnvironment),
airbyteRole,
airbyteVersion,
configRepository.orElseThrow());
}

private void registerWorkerFactory(final WorkerFactory workerFactory, final MaxWorkersConfig maxWorkersConfiguration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,18 @@

package io.airbyte.workers.config;

import io.airbyte.analytics.Deployment;
import io.airbyte.analytics.TrackingClient;
import io.airbyte.analytics.TrackingClientSingleton;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.temporal.TemporalUtils;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.Configs.DeploymentMode;
import io.airbyte.config.Configs.TrackingStrategy;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.persistence.job.DefaultJobCreator;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.persistence.job.factory.DefaultSyncJobFactory;
import io.airbyte.persistence.job.factory.OAuthConfigSupplier;
import io.airbyte.persistence.job.factory.SyncJobFactory;
Expand All @@ -23,7 +29,9 @@
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.WorkerFactory;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Optional;

/**
* Micronaut bean factory for Temporal-related singletons.
Expand All @@ -33,7 +41,23 @@ public class TemporalBeanFactory {

@Singleton
@Requires(env = WorkerMode.CONTROL_PLANE)
public TrackingClient trackingClient() {
public TrackingClient trackingClient(final Optional<TrackingStrategy> trackingStrategy,
final DeploymentMode deploymentMode,
Optional<JobPersistence> jobPersistence,
WorkerEnvironment workerEnvironment,
@Value("${airbyte.role}") String airbyteRole,
AirbyteVersion airbyteVersion,
Optional<ConfigRepository> configRepository)
throws IOException {

TrackingClientSingleton.initialize(
trackingStrategy.orElseThrow(),
new Deployment(deploymentMode, jobPersistence.orElseThrow().getDeployment().orElseThrow(),
workerEnvironment),
airbyteRole,
airbyteVersion,
configRepository.orElseThrow());

return TrackingClientSingleton.get();
}

Expand Down

0 comments on commit 559a79e

Please sign in to comment.