Skip to content

Commit

Permalink
Clean up control flow actions (#6616)
Browse files Browse the repository at this point in the history
  • Loading branch information
gosusnp committed May 16, 2023
1 parent 382aeff commit e49c8b4
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 107 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.context;

/**
* Feature flags to consider during a Replication job.
*/
public record ReplicationFeatureFlags(boolean shouldCommitStateAsap, boolean shouldCommitStatsAsap, boolean shouldHandleStreamStatus) {

}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.general;

import io.airbyte.config.StandardSyncInput;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.HandleStreamStatus;
import io.airbyte.featureflag.Workspace;
import io.airbyte.workers.context.ReplicationContext;
import io.airbyte.workers.context.ReplicationFeatureFlags;

/**
* Read features flags we need to consider during a sync.
*/
public class ReplicationFeatureFlagReader {

private final FeatureFlagClient featureFlagClient;

public ReplicationFeatureFlagReader(final FeatureFlagClient featureFlagClient) {
this.featureFlagClient = featureFlagClient;
}

/**
* Read Feature flags we need to consider during a sync.
*
* @param replicationContext the context of the sync.
* @param syncInput the input of the sync.
* @return The flags.
*/
public ReplicationFeatureFlags readReplicationFeatureFlags(final ReplicationContext replicationContext, final StandardSyncInput syncInput) {
return new ReplicationFeatureFlags(
ReplicationFeatureFlagReader.shouldCommitStateAsap(syncInput),
ReplicationFeatureFlagReader.shouldCommitStatsAsap(syncInput),
shouldHandleStreamStatus(replicationContext));
}

/**
* Helper function to read the shouldCommitStateAsap feature flag.
*/
static boolean shouldCommitStateAsap(final StandardSyncInput syncInput) {
return syncInput.getCommitStateAsap() != null && syncInput.getCommitStateAsap();
}

/**
* Helper function to read the shouldCommitStatsAsap feature flag.
*/
static boolean shouldCommitStatsAsap(final StandardSyncInput syncInput) {
// For consistency, we should only be committing stats early if we are committing states early.
// Otherwise, we are risking stats discrepancy as we are committing stats for states that haven't
// been persisted yet.
return shouldCommitStateAsap(syncInput) && syncInput.getCommitStatsAsap() != null && syncInput.getCommitStatsAsap();
}

/**
* Helper function to read the status of the {@link HandleStreamStatus} feature flag once at the
* start of the replication exection.
*
* @param replicationContext The {@link ReplicationContext} of the replication.
* @return The result of checking the status of the {@link HandleStreamStatus} feature flag.
*/
private boolean shouldHandleStreamStatus(final ReplicationContext replicationContext) {
return featureFlagClient.boolVariation(HandleStreamStatus.INSTANCE, new Workspace(replicationContext.workspaceId()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ private static HeartbeatTimeoutChaperone createHeartbeatTimeoutChaperone(final H
private static MessageTracker createMessageTracker(final SyncPersistence syncPersistence,
final FeatureFlags featureFlags,
final StandardSyncInput syncInput) {
final boolean commitStatsAsap = DefaultReplicationWorker.shouldCommitStatsAsap(syncInput);
final boolean commitStatsAsap = ReplicationFeatureFlagReader.shouldCommitStatsAsap(syncInput);
final MessageTracker messageTracker =
commitStatsAsap ? new AirbyteMessageTracker(syncPersistence, featureFlags) : new AirbyteMessageTracker(featureFlags);
return messageTracker;
Expand Down Expand Up @@ -209,7 +209,7 @@ private static ReplicationWorker createReplicationWorker(final AirbyteSource sou
metricReporter,
connectorConfigUpdater,
heartbeatTimeoutChaperone,
featureFlagClient,
new ReplicationFeatureFlagReader(featureFlagClient),
airbyteMessageDataExtractor,
replicationEventPublishingHelper);
}
Expand All @@ -221,7 +221,7 @@ private static SyncPersistence createSyncPersistence(final SyncPersistenceFactor
final StandardSyncInput syncInput,
final IntegrationLauncherConfig sourceLauncherConfig) {
// TODO clean up the feature flag init once commitStates and commitStats have been rolled out
final boolean commitStatesAsap = DefaultReplicationWorker.shouldCommitStateAsap(syncInput);
final boolean commitStatesAsap = ReplicationFeatureFlagReader.shouldCommitStateAsap(syncInput);
final SyncPersistence syncPersistence = commitStatesAsap
? syncPersistenceFactory.get(syncInput.getConnectionId(), Long.parseLong(sourceLauncherConfig.getJobId()),
sourceLauncherConfig.getAttemptId().intValue(), syncInput.getCatalog())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.StreamDescriptor;
import io.airbyte.workers.context.ReplicationContext;
import io.airbyte.workers.context.ReplicationFeatureFlags;
import io.airbyte.workers.helper.AirbyteMessageDataExtractor;
import io.airbyte.workers.internal.AirbyteMapper;
import io.airbyte.workers.internal.FieldSelector;
Expand Down Expand Up @@ -48,6 +49,8 @@ class ReplicationWorkerHelper {
private long recordsRead;
private StreamDescriptor currentDestinationStream = null;
private StreamDescriptor currentSourceStream = null;
private ReplicationContext replicationContext = null;
private ReplicationFeatureFlags replicationFeatureFlags = null;

public ReplicationWorkerHelper(
final AirbyteMessageDataExtractor airbyteMessageDataExtractor,
Expand All @@ -68,20 +71,55 @@ public ReplicationWorkerHelper(
this.recordsRead = 0L;
}

public void initialize(final ReplicationContext replicationContext, final ReplicationFeatureFlags replicationFeatureFlags) {
this.replicationContext = replicationContext;
this.replicationFeatureFlags = replicationFeatureFlags;
}

public void beforeReplication(final ConfiguredAirbyteCatalog catalog) {
fieldSelector.populateFields(catalog);
}

public void endOfSource(final ReplicationContext replicationContext) {
public void endOfReplication() {
// Publish a complete status event for all streams associated with the connection.
// This is to ensure that all streams end up in a complete state and is necessary for
// connections with destinations that do not emit messages to trigger the completion.
replicationAirbyteMessageEventPublishingHelper.publishCompleteStatusEvent(new StreamDescriptor(), replicationContext,
AirbyteMessageOrigin.INTERNAL);
}

public void endOfSource() {
LOGGER.info("Total records read: {} ({})", recordsRead,
FileUtils.byteCountToDisplaySize(messageTracker.getSyncStatsTracker().getTotalBytesEmitted()));

fieldSelector.reportMetrics(replicationContext.sourceId());
}

public Optional<AirbyteMessage> processMessageFromSource(final AirbyteMessage airbyteMessage,
final ReplicationContext replicationContext,
final boolean handleStreamStatus) {
public void endOfDestination() {
// Publish the completed state for the last stream, if present
final StreamDescriptor currentStream = getCurrentDestinationStream();
if (replicationFeatureFlags.shouldHandleStreamStatus() && currentStream != null) {
replicationAirbyteMessageEventPublishingHelper.publishCompleteStatusEvent(currentStream, replicationContext,
AirbyteMessageOrigin.DESTINATION);
}
}

public void handleSourceFailure() {
if (replicationFeatureFlags.shouldHandleStreamStatus()) {
replicationAirbyteMessageEventPublishingHelper.publishIncompleteStatusEvent(getCurrentSourceStream(),
replicationContext, AirbyteMessageOrigin.SOURCE);
}
}

public void handleDestinationFailure() {
if (replicationFeatureFlags.shouldHandleStreamStatus()) {
replicationAirbyteMessageEventPublishingHelper.publishIncompleteStatusEvent(getCurrentSourceStream(),
replicationContext, AirbyteMessageOrigin.DESTINATION);
}

}

public Optional<AirbyteMessage> processMessageFromSource(final AirbyteMessage airbyteMessage) {
final StreamDescriptor previousStream = currentSourceStream;
currentSourceStream = airbyteMessageDataExtractor.extractStreamDescriptor(airbyteMessage, previousStream);
if (currentSourceStream != null) {
Expand All @@ -95,7 +133,7 @@ public Optional<AirbyteMessage> processMessageFromSource(final AirbyteMessage ai

messageTracker.acceptFromSource(message);

if (handleStreamStatus && shouldPublishMessage(airbyteMessage)) {
if (replicationFeatureFlags.shouldHandleStreamStatus() && shouldPublishMessage(airbyteMessage)) {
LOGGER.debug("Publishing source event for stream {}:{}...", currentSourceStream.getNamespace(), currentSourceStream.getName());
replicationAirbyteMessageEventPublishingHelper
.publishStatusEvent(new ReplicationAirbyteMessageEvent(AirbyteMessageOrigin.SOURCE, message, replicationContext));
Expand All @@ -119,10 +157,7 @@ public Optional<AirbyteMessage> processMessageFromSource(final AirbyteMessage ai
return Optional.of(message);
}

public void processMessageFromDestination(final AirbyteMessage message,
final boolean commitStatesAsap,
final boolean handleStreamStatus,
final ReplicationContext replicationContext) {
public void processMessageFromDestination(final AirbyteMessage message) {
LOGGER.info("State in DefaultReplicationWorker from destination: {}", message);
final StreamDescriptor previousStream = currentDestinationStream;
currentDestinationStream = airbyteMessageDataExtractor.extractStreamDescriptor(message, previousStream);
Expand All @@ -132,17 +167,17 @@ public void processMessageFromDestination(final AirbyteMessage message,

// If the worker has moved on to the next stream, ensure that a completed status is sent
// for the previously tracked stream.
if (handleStreamStatus && previousStream != null && !previousStream.equals(currentDestinationStream)) {
if (replicationFeatureFlags.shouldHandleStreamStatus() && previousStream != null && !previousStream.equals(currentDestinationStream)) {
replicationAirbyteMessageEventPublishingHelper.publishCompleteStatusEvent(currentDestinationStream, replicationContext,
AirbyteMessageOrigin.DESTINATION);
}

messageTracker.acceptFromDestination(message);
if (commitStatesAsap && message.getType() == Type.STATE) {
if (replicationFeatureFlags.shouldCommitStateAsap() && message.getType() == Type.STATE) {
syncPersistence.persist(replicationContext.connectionId(), message.getState());
}

if (handleStreamStatus && shouldPublishMessage(message)) {
if (replicationFeatureFlags.shouldHandleStreamStatus() && shouldPublishMessage(message)) {
LOGGER.debug("Publishing destination event for stream {}:{}...", currentDestinationStream.getNamespace(), currentDestinationStream.getName());
replicationAirbyteMessageEventPublishingHelper
.publishStatusEvent(new ReplicationAirbyteMessageEvent(AirbyteMessageOrigin.DESTINATION, message, replicationContext));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -941,7 +941,7 @@ private ReplicationWorker getDefaultReplicationWorker(final boolean fieldSelecti
workerMetricReporter,
connectorConfigUpdater,
heartbeatTimeoutChaperone,
featureFlagClient,
new ReplicationFeatureFlagReader(featureFlagClient),
airbyteMessageDataExtractor,
replicationAirbyteMessageEventPublishingHelper);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public static void executeOneSync() throws InterruptedException {
metricReporter,
connectorConfigUpdater,
heartbeatTimeoutChaperone,
featureFlagClient,
new ReplicationFeatureFlagReader(featureFlagClient),
airbyteMessageDataExtractor,
replicationAirbyteMessageEventPublishingHelper);
final AtomicReference<ReplicationOutput> output = new AtomicReference<>();
Expand Down

0 comments on commit e49c8b4

Please sign in to comment.