Skip to content

Commit

Permalink
convert EnvVariableFeatureFlag to an injected dependency (#21171)
Browse files Browse the repository at this point in the history
* inject feature-flag client into integration-launcher

* inject feature-flag client into message-tracker

* inject feature-flag client into DefaultAirbyteSource

* missed reference in message-tracker
  • Loading branch information
colesnodgrass authored and jbfbell committed Jan 13, 2023
1 parent 53557c3 commit 1a25de2
Show file tree
Hide file tree
Showing 15 changed files with 113 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import datadog.trace.api.Trace;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.json.Jsons;
Expand Down Expand Up @@ -56,23 +56,27 @@ public class DefaultAirbyteSource implements AirbyteSource {
private Process sourceProcess = null;
private Iterator<AirbyteMessage> messageIterator = null;
private Integer exitValue = null;
private final boolean logConnectorMessages = new EnvVariableFeatureFlags().logConnectorMessages();
private final FeatureFlags featureFlags;

public DefaultAirbyteSource(final IntegrationLauncher integrationLauncher) {
this(integrationLauncher, new DefaultAirbyteStreamFactory(CONTAINER_LOG_MDC_BUILDER));
public DefaultAirbyteSource(final IntegrationLauncher integrationLauncher, final FeatureFlags featureFlags) {
this(integrationLauncher, new DefaultAirbyteStreamFactory(CONTAINER_LOG_MDC_BUILDER), featureFlags);
}

public DefaultAirbyteSource(final IntegrationLauncher integrationLauncher, final AirbyteStreamFactory streamFactory) {
this(integrationLauncher, streamFactory, new HeartbeatMonitor(HEARTBEAT_FRESH_DURATION));
public DefaultAirbyteSource(final IntegrationLauncher integrationLauncher,
final AirbyteStreamFactory streamFactory,
final FeatureFlags featureFlags) {
this(integrationLauncher, streamFactory, new HeartbeatMonitor(HEARTBEAT_FRESH_DURATION), featureFlags);
}

@VisibleForTesting
DefaultAirbyteSource(final IntegrationLauncher integrationLauncher,
final AirbyteStreamFactory streamFactory,
final HeartbeatMonitor heartbeatMonitor) {
final HeartbeatMonitor heartbeatMonitor,
final FeatureFlags featureFlags) {
this.integrationLauncher = integrationLauncher;
this.streamFactory = streamFactory;
this.heartbeatMonitor = heartbeatMonitor;
this.featureFlags = featureFlags;
}

@Trace(operationName = WORKER_OPERATION_NAME)
Expand Down Expand Up @@ -167,7 +171,7 @@ public void cancel() throws Exception {
}

private void logInitialStateAsJSON(final WorkerSourceConfig sourceConfig) {
if (!logConnectorMessages) {
if (!featureFlags.logConnectorMessages()) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import datadog.trace.api.Trace;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.FailureReason;
import io.airbyte.config.State;
Expand Down Expand Up @@ -66,7 +66,7 @@ public class AirbyteMessageTracker implements MessageTracker {
private final List<AirbyteTraceMessage> destinationErrorTraceMessages;
private final List<AirbyteTraceMessage> sourceErrorTraceMessages;
private final StateAggregator stateAggregator;
private final boolean logConnectorMessages = new EnvVariableFeatureFlags().logConnectorMessages();
private final FeatureFlags featureFlags;

// These variables support SYNC level estimates and are meant for sources where stream level
// estimates are not possible e.g. CDC sources.
Expand All @@ -91,16 +91,18 @@ private enum ConnectorType {
DESTINATION
}

public AirbyteMessageTracker() {
public AirbyteMessageTracker(final FeatureFlags featureFlags) {
this(new StateDeltaTracker(STATE_DELTA_TRACKER_MEMORY_LIMIT_BYTES),
new DefaultStateAggregator(new EnvVariableFeatureFlags().useStreamCapableState()),
new StateMetricsTracker(STATE_METRICS_TRACKER_MESSAGE_LIMIT));
new DefaultStateAggregator(featureFlags.useStreamCapableState()),
new StateMetricsTracker(STATE_METRICS_TRACKER_MESSAGE_LIMIT),
featureFlags);
}

@VisibleForTesting
protected AirbyteMessageTracker(final StateDeltaTracker stateDeltaTracker,
final StateAggregator stateAggregator,
final StateMetricsTracker stateMetricsTracker) {
final StateMetricsTracker stateMetricsTracker,
final FeatureFlags featureFlags) {
this.sourceOutputState = new AtomicReference<>();
this.destinationOutputState = new AtomicReference<>();
this.streamToRunningCount = new HashMap<>();
Expand All @@ -115,6 +117,7 @@ protected AirbyteMessageTracker(final StateDeltaTracker stateDeltaTracker,
this.destinationErrorTraceMessages = new ArrayList<>();
this.sourceErrorTraceMessages = new ArrayList<>();
this.stateAggregator = stateAggregator;
this.featureFlags = featureFlags;
}

@Trace(operationName = WORKER_OPERATION_NAME)
Expand Down Expand Up @@ -542,7 +545,7 @@ public Boolean getUnreliableStateTimingMetrics() {
}

private void logMessageAsJSON(final String caller, final AirbyteMessage message) {
if (!logConnectorMessages) {
if (!featureFlags.logConnectorMessages()) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class AirbyteIntegrationLauncher implements IntegrationLauncher {

/**
* If true, launcher will use a separated isolated pool to run the job.
*
* <p>
* At this moment, we put custom connector jobs into an isolated pool.
*/
private final boolean useIsolatedPool;
Expand All @@ -56,13 +56,14 @@ public AirbyteIntegrationLauncher(final String jobId,
final String imageName,
final ProcessFactory processFactory,
final ResourceRequirements resourceRequirement,
final boolean useIsolatedPool) {
final boolean useIsolatedPool,
final FeatureFlags featureFlags) {
this.jobId = jobId;
this.attempt = attempt;
this.imageName = imageName;
this.processFactory = processFactory;
this.resourceRequirement = resourceRequirement;
this.featureFlags = new EnvVariableFeatureFlags();
this.featureFlags = featureFlags;
this.useIsolatedPool = useIsolatedPool;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.workers.general;

import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType;
import io.airbyte.config.ReplicationOutput;
import io.airbyte.config.StandardSyncInput;
Expand Down Expand Up @@ -65,7 +66,7 @@ public class ReplicationWorkerPerformanceTest {
public void executeOneSync() throws InterruptedException {
final var perSource = new LimitedAirbyteSource();
final var perDestination = new EmptyAirbyteDestination();
final var messageTracker = new AirbyteMessageTracker();
final var messageTracker = new AirbyteMessageTracker(new EnvVariableFeatureFlags());
final var connectorConfigUpdater = Mockito.mock(ConnectorConfigUpdater.class);
final var metricReporter = new WorkerMetricReporter(new NotImplementedMetricClient(), "test-image:0.01");
final var dstNamespaceMapper = new NamespacingMapper(NamespaceDefinitionType.DESTINATION, "", "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.logging.LoggingHelper.Color;
Expand Down Expand Up @@ -80,6 +82,8 @@ class DefaultAirbyteSourceTest {

private static Path logJobRoot;

private static final FeatureFlags featureFlags = new EnvVariableFeatureFlags();

static {
try {
logJobRoot = Files.createTempDirectory(Path.of("/tmp"), "mdc_test");
Expand Down Expand Up @@ -137,7 +141,7 @@ void testSuccessfulLifecycle() throws Exception {

when(heartbeatMonitor.isBeating()).thenReturn(true).thenReturn(false);

final AirbyteSource source = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor);
final AirbyteSource source = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor, featureFlags);
source.start(SOURCE_CONFIG, jobRoot);

final List<AirbyteMessage> messages = Lists.newArrayList();
Expand Down Expand Up @@ -173,7 +177,7 @@ void testTaggedLogs() throws Exception {
when(heartbeatMonitor.isBeating()).thenReturn(true).thenReturn(false);

final AirbyteSource source = new DefaultAirbyteSource(integrationLauncher, streamFactory,
heartbeatMonitor);
heartbeatMonitor, featureFlags);
source.start(SOURCE_CONFIG, jobRoot);

final List<AirbyteMessage> messages = Lists.newArrayList();
Expand All @@ -198,7 +202,7 @@ void testTaggedLogs() throws Exception {

@Test
void testNonzeroExitCodeThrows() throws Exception {
final AirbyteSource tap = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor);
final AirbyteSource tap = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor, featureFlags);
tap.start(SOURCE_CONFIG, jobRoot);

when(process.exitValue()).thenReturn(1);
Expand All @@ -208,7 +212,7 @@ void testNonzeroExitCodeThrows() throws Exception {

@Test
void testIgnoredExitCodes() throws Exception {
final AirbyteSource tap = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor);
final AirbyteSource tap = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor, featureFlags);
tap.start(SOURCE_CONFIG, jobRoot);
when(process.isAlive()).thenReturn(false);

Expand All @@ -220,7 +224,7 @@ void testIgnoredExitCodes() throws Exception {

@Test
void testGetExitValue() throws Exception {
final AirbyteSource source = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor);
final AirbyteSource source = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor, featureFlags);
source.start(SOURCE_CONFIG, jobRoot);

when(process.isAlive()).thenReturn(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.FailureReason;
import io.airbyte.config.State;
Expand Down Expand Up @@ -49,7 +50,7 @@ class AirbyteMessageTrackerTest {
@BeforeEach
void setup() {
final StateMetricsTracker stateMetricsTracker = new StateMetricsTracker(10L * 1024L * 1024L);
this.messageTracker = new AirbyteMessageTracker(mStateDeltaTracker, mStateAggregator, stateMetricsTracker);
this.messageTracker = new AirbyteMessageTracker(mStateDeltaTracker, mStateAggregator, stateMetricsTracker, new EnvVariableFeatureFlags());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.WorkerEnvConstants;
import io.airbyte.workers.WorkerConfigs;
Expand Down Expand Up @@ -49,14 +50,16 @@ class AirbyteIntegrationLauncherTest {
CONFIG, "{}",
CATALOG, "{}",
"state", "{}");

private static final FeatureFlags featureFlags = new EnvVariableFeatureFlags();
private static final Map<String, String> JOB_METADATA = Map.of(
WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, FAKE_IMAGE,
WorkerEnvConstants.WORKER_JOB_ID, JOB_ID,
WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(JOB_ATTEMPT),
EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(new EnvVariableFeatureFlags().useStreamCapableState()),
EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(new EnvVariableFeatureFlags().autoDetectSchema()),
EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(new EnvVariableFeatureFlags().applyFieldSelection()),
EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, new EnvVariableFeatureFlags().fieldSelectionWorkspaces());
EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(featureFlags.useStreamCapableState()),
EnvVariableFeatureFlags.AUTO_DETECT_SCHEMA, String.valueOf(featureFlags.autoDetectSchema()),
EnvVariableFeatureFlags.APPLY_FIELD_SELECTION, String.valueOf(featureFlags.applyFieldSelection()),
EnvVariableFeatureFlags.FIELD_SELECTION_WORKSPACES, featureFlags.fieldSelectionWorkspaces());

private WorkerConfigs workerConfigs;
@Mock
Expand All @@ -66,7 +69,8 @@ class AirbyteIntegrationLauncherTest {
@BeforeEach
void setUp() {
workerConfigs = new WorkerConfigs(new EnvConfigs());
launcher = new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, FAKE_IMAGE, processFactory, workerConfigs.getResourceRequirements(), false);
launcher = new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, FAKE_IMAGE, processFactory, workerConfigs.getResourceRequirements(), false,
featureFlags);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package io.airbyte.commons.features;

/**
* Interface that describe which features are activated in airbyte. Currently the only
* Interface that describe which features are activated in airbyte. Currently, the only
* implementation relies on env. Ideally it should be on some DB.
*/
public interface FeatureFlags {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ public Optional<String> runJob() throws Exception {
sourceLauncherConfig.getDockerImage(),
processFactory,
syncInput.getSourceResourceRequirements(),
useIsolatedPool);
useIsolatedPool,
featureFlags);

log.info("Setting up destination launcher...");
final var destinationLauncher = new AirbyteIntegrationLauncher(
Expand All @@ -132,15 +133,16 @@ public Optional<String> runJob() throws Exception {
destinationLauncherConfig.getDockerImage(),
processFactory,
syncInput.getDestinationResourceRequirements(),
useIsolatedPool);
useIsolatedPool,
featureFlags);

log.info("Setting up source...");
// reset jobs use an empty source to induce resetting all data in destination.
final var airbyteSource =
WorkerConstants.RESET_JOB_SOURCE_DOCKER_IMAGE_STUB.equals(sourceLauncherConfig.getDockerImage()) ? new EmptyAirbyteSource(
featureFlags.useStreamCapableState())
: new DefaultAirbyteSource(sourceLauncher,
getStreamFactory(sourceLauncherConfig.getProtocolVersion(), DefaultAirbyteSource.CONTAINER_LOG_MDC_BUILDER));
getStreamFactory(sourceLauncherConfig.getProtocolVersion(), DefaultAirbyteSource.CONTAINER_LOG_MDC_BUILDER), featureFlags);

MetricClientFactory.initialize(MetricEmittingApps.WORKER);
final var metricClient = MetricClientFactory.getMetricClient();
Expand All @@ -156,7 +158,7 @@ public Optional<String> runJob() throws Exception {
new DefaultAirbyteDestination(destinationLauncher, getStreamFactory(destinationLauncherConfig.getProtocolVersion(),
DefaultAirbyteDestination.CONTAINER_LOG_MDC_BUILDER),
new VersionedAirbyteMessageBufferedWriterFactory(serDeProvider, migratorFactory, destinationLauncherConfig.getProtocolVersion())),
new AirbyteMessageTracker(),
new AirbyteMessageTracker(featureFlags),
new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)),
metricReporter,
new ConnectorConfigUpdater(sourceApi, destinationApi),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.jackson.MoreMappers;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
Expand Down Expand Up @@ -1270,14 +1271,14 @@ protected void assertNamespaceNormalization(final String testCaseId,
private ConnectorSpecification runSpec() throws WorkerException {
return convertProtocolObject(
new DefaultGetSpecWorker(
new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, false))
new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, false, new EnvVariableFeatureFlags()))
.run(new JobGetSpecConfig().withDockerImage(getImageName()), jobRoot).getSpec(),
ConnectorSpecification.class);
}

protected StandardCheckConnectionOutput runCheck(final JsonNode config) throws WorkerException {
return new DefaultCheckConnectionWorker(
new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, false),
new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, false, new EnvVariableFeatureFlags()),
mConnectorConfigUpdater)
.run(new StandardCheckConnectionInput().withConnectionConfiguration(config), jobRoot)
.getCheckConnection();
Expand All @@ -1287,7 +1288,7 @@ protected StandardCheckConnectionOutput.Status runCheckWithCatchedException(
final JsonNode config) {
try {
final StandardCheckConnectionOutput standardCheckConnectionOutput = new DefaultCheckConnectionWorker(
new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, false),
new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, false, new EnvVariableFeatureFlags()),
mConnectorConfigUpdater)
.run(new StandardCheckConnectionInput().withConnectionConfiguration(config), jobRoot)
.getCheckConnection();
Expand All @@ -1300,7 +1301,7 @@ protected StandardCheckConnectionOutput.Status runCheckWithCatchedException(

protected AirbyteDestination getDestination() {
return new DefaultAirbyteDestination(
new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, false));
new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, null, false, new EnvVariableFeatureFlags()));
}

protected void runSyncAndVerifyStateOutput(final JsonNode config,
Expand Down
Loading

0 comments on commit 1a25de2

Please sign in to comment.