Skip to content

Commit

Permalink
chore: make replication worker buffer size configurable (#13749)
Browse files Browse the repository at this point in the history
  • Loading branch information
gosusnp committed Aug 28, 2024
1 parent b4d99c6 commit 6d0bd7c
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand Down Expand Up @@ -86,8 +85,6 @@ public class BufferedReplicationWorker implements ReplicationWorker {
private final Stopwatch processFromDestStopwatch;
private final StreamStatusCompletionTracker streamStatusCompletionTracker;

private static final int sourceMaxBufferSize = 1000;
private static final int destinationMaxBufferSize = 1000;
private static final int executorShutdownGracePeriodInSeconds = 10;

public BufferedReplicationWorker(final String jobId,
Expand All @@ -100,23 +97,8 @@ public BufferedReplicationWorker(final String jobId,
final ReplicationFeatureFlagReader replicationFeatureFlagReader,
final ReplicationWorkerHelper replicationWorkerHelper,
final DestinationTimeoutMonitor destinationTimeoutMonitor,
final StreamStatusCompletionTracker streamStatusCompletionTracker) {
this(jobId, attempt, source, destination, syncPersistence, recordSchemaValidator, srcHeartbeatTimeoutChaperone, replicationFeatureFlagReader,
replicationWorkerHelper, destinationTimeoutMonitor, OptionalInt.empty(), streamStatusCompletionTracker);
}

public BufferedReplicationWorker(final String jobId,
final int attempt,
final AirbyteSource source,
final AirbyteDestination destination,
final SyncPersistence syncPersistence,
final RecordSchemaValidator recordSchemaValidator,
final HeartbeatTimeoutChaperone srcHeartbeatTimeoutChaperone,
final ReplicationFeatureFlagReader replicationFeatureFlagReader,
final ReplicationWorkerHelper replicationWorkerHelper,
final DestinationTimeoutMonitor destinationTimeoutMonitor,
final OptionalInt pollTimeOutDurationForQueue,
final StreamStatusCompletionTracker streamStatusCompletionTracker) {
final StreamStatusCompletionTracker streamStatusCompletionTracker,
final BufferConfiguration bufferConfiguration) {
this.jobId = jobId;
this.attempt = attempt;
this.source = source;
Expand All @@ -127,8 +109,10 @@ public BufferedReplicationWorker(final String jobId,
this.recordSchemaValidator = recordSchemaValidator;
this.syncPersistence = syncPersistence;
this.srcHeartbeatTimeoutChaperone = srcHeartbeatTimeoutChaperone;
this.messagesFromSourceQueue = new ClosableLinkedBlockingQueue<>(sourceMaxBufferSize, pollTimeOutDurationForQueue);
this.messagesForDestinationQueue = new ClosableLinkedBlockingQueue<>(destinationMaxBufferSize, pollTimeOutDurationForQueue);
this.messagesFromSourceQueue =
new ClosableLinkedBlockingQueue<>(bufferConfiguration.getSourceMaxBufferSize(), bufferConfiguration.getPollTimeoutDuration());
this.messagesForDestinationQueue =
new ClosableLinkedBlockingQueue<>(bufferConfiguration.getDestinationMaxBufferSize(), bufferConfiguration.getPollTimeoutDuration());
// readFromSource + processMessage + writeToDestination + readFromDestination +
// source heartbeat + dest timeout monitor + workload heartbeat = 7 threads
this.executors = Executors.newFixedThreadPool(7);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.airbyte.featureflag.FieldSelectionEnabled;
import io.airbyte.featureflag.Multi;
import io.airbyte.featureflag.RemoveValidationLimit;
import io.airbyte.featureflag.ReplicationBufferOverride;
import io.airbyte.featureflag.ShouldFailSyncOnDestinationTimeout;
import io.airbyte.featureflag.Source;
import io.airbyte.featureflag.SourceDefinition;
Expand Down Expand Up @@ -311,6 +312,11 @@ private static ReplicationWorker createReplicationWorker(final AirbyteSource sou
final StreamStatusCompletionTracker streamStatusCompletionTracker,
final StreamStatusTrackerFactory streamStatusTrackerFactory) {
final Context flagContext = getFeatureFlagContext(replicationInput);

final int bufferSize = featureFlagClient.intVariation(ReplicationBufferOverride.INSTANCE, flagContext);
final BufferConfiguration bufferConfiguration =
bufferSize > 0 ? BufferConfiguration.withBufferSize(bufferSize) : BufferConfiguration.withDefaultConfiguration();

return buildReplicationWorkerInstance(
jobRunConfig.getJobId(),
Math.toIntExact(jobRunConfig.getAttemptId()),
Expand All @@ -335,7 +341,8 @@ private static ReplicationWorker createReplicationWorker(final AirbyteSource sou
workloadId,
airbyteApiClient,
streamStatusCompletionTracker,
streamStatusTrackerFactory);
streamStatusTrackerFactory,
bufferConfiguration);
}

private static Context getFeatureFlagContext(final ReplicationInput replicationInput) {
Expand Down Expand Up @@ -380,14 +387,16 @@ private static ReplicationWorker buildReplicationWorkerInstance(final String job
final Optional<String> workloadId,
final AirbyteApiClient airbyteApiClient,
final StreamStatusCompletionTracker streamStatusCompletionTracker,
final StreamStatusTrackerFactory streamStatusTrackerFactory) {
final StreamStatusTrackerFactory streamStatusTrackerFactory,
final BufferConfiguration bufferConfiguration) {
final ReplicationWorkerHelper replicationWorkerHelper =
new ReplicationWorkerHelper(fieldSelector, mapper, messageTracker, syncPersistence,
messageEventPublishingHelper, new ThreadedTimeTracker(), onReplicationRunning, workloadApiClient,
workloadEnabled, analyticsMessageTracker, workloadId, airbyteApiClient, streamStatusCompletionTracker, streamStatusTrackerFactory);

return new BufferedReplicationWorker(jobId, attempt, source, destination, syncPersistence, recordSchemaValidator,
srcHeartbeatTimeoutChaperone, replicationFeatureFlagReader, replicationWorkerHelper, destinationTimeout, streamStatusCompletionTracker);
srcHeartbeatTimeoutChaperone, replicationFeatureFlagReader, replicationWorkerHelper, destinationTimeout, streamStatusCompletionTracker,
bufferConfiguration);

}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.airbyte.workers.general

import io.airbyte.commons.concurrency.ClosableLinkedBlockingQueue

data class BufferConfiguration(
val sourceMaxBufferSize: Int = DEFAULT_SOURCE_MAX_BUFFER_SIZE,
val destinationMaxBufferSize: Int = DEFAULT_DESTINATION_MAX_BUFFER_SIZE,
val pollTimeoutDuration: Int = DEFAULT_POLL_TIME_OUT_DURATION_SECONDS,
) {
companion object {
const val DEFAULT_SOURCE_MAX_BUFFER_SIZE = 1000
const val DEFAULT_DESTINATION_MAX_BUFFER_SIZE = 1000
const val DEFAULT_POLL_TIME_OUT_DURATION_SECONDS = ClosableLinkedBlockingQueue.DEFAULT_POLL_TIME_OUT_DURATION_SECONDS

// Helpers for Java due to the lack of named parameters

@JvmStatic
fun withBufferSize(bufferSize: Int) = BufferConfiguration(sourceMaxBufferSize = bufferSize, destinationMaxBufferSize = bufferSize)

@JvmStatic
fun withPollTimeout(pollTimeoutDuration: Int): BufferConfiguration = BufferConfiguration(pollTimeoutDuration = pollTimeoutDuration)

@JvmStatic
fun withDefaultConfiguration() = BufferConfiguration()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import io.airbyte.config.StandardSyncSummary.ReplicationStatus;
import io.airbyte.workers.internal.FieldSelector;
import java.util.Optional;
import java.util.OptionalInt;
import org.junit.jupiter.api.Test;

/**
Expand All @@ -41,8 +40,8 @@ replicationAirbyteMessageEventPublishingHelper, new ThreadedTimeTracker(), onRep
replicationFeatureFlagReader,
replicationWorkerHelper,
destinationTimeoutMonitor,
OptionalInt.of(1),
streamStatusCompletionTracker);
streamStatusCompletionTracker,
BufferConfiguration.withPollTimeout(1));
}

// BufferedReplicationWorkerTests.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.workers.general.performance;

import io.airbyte.workers.RecordSchemaValidator;
import io.airbyte.workers.general.BufferConfiguration;
import io.airbyte.workers.general.BufferedReplicationWorker;
import io.airbyte.workers.general.ReplicationFeatureFlagReader;
import io.airbyte.workers.general.ReplicationWorker;
Expand Down Expand Up @@ -46,7 +47,7 @@ public ReplicationWorker getReplicationWorker(final String jobId,
final StreamStatusCompletionTracker streamStatusCompletionTracker) {
return new BufferedReplicationWorker(jobId, attempt, source, destination, syncPersistence, recordSchemaValidator,
srcHeartbeatTimeoutChaperone, replicationFeatureFlagReader, replicationWorkerHelper, destinationTimeoutMonitor,
streamStatusCompletionTracker);
streamStatusCompletionTracker, BufferConfiguration.withDefaultConfiguration());
}

public static void main(final String[] args) throws IOException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

package io.airbyte.commons.concurrency;

import java.util.OptionalInt;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
Expand All @@ -17,17 +16,17 @@
public class ClosableLinkedBlockingQueue<T> implements ClosableQueue<T> {

private static final Logger LOGGER = LoggerFactory.getLogger(ClosableLinkedBlockingQueue.class);
private static final int DEFAULT_POLL_TIME_OUT_DURATION_SECONDS = 5;
public static final int DEFAULT_POLL_TIME_OUT_DURATION_SECONDS = 5;
private final BlockingQueue<T> queue;

private final AtomicBoolean closed;
private final ReadWriteLock closedLock;
private final int timeOutDuration;

public ClosableLinkedBlockingQueue(int maxQueueSize, OptionalInt pollTimeOutDurationInSeconds) {
public ClosableLinkedBlockingQueue(int maxQueueSize, int pollTimeOutDurationInSeconds) {
LOGGER.info("Using ClosableLinkedBlockingQueue");
this.queue = new LinkedBlockingQueue<>(maxQueueSize);
this.timeOutDuration = pollTimeOutDurationInSeconds.orElse(DEFAULT_POLL_TIME_OUT_DURATION_SECONDS);
this.timeOutDuration = pollTimeOutDurationInSeconds;
this.closed = new AtomicBoolean();
this.closedLock = new ReentrantReadWriteLock();
}
Expand Down
2 changes: 2 additions & 0 deletions airbyte-featureflag/src/main/kotlin/FlagDefinitions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,5 @@ object LogStateMsgs : Temporary<Boolean>(key = "platform.log-state-msgs", defaul
object EnableMappers : Temporary<Boolean>(key = "platform.enable-mappers", default = false)

object ReplicationMonoPod : Temporary<Boolean>(key = "replication-mono-pod", default = false)

object ReplicationBufferOverride : Temporary<Int>(key = "platform.replication-buffer-override", default = 0)

0 comments on commit 6d0bd7c

Please sign in to comment.