Skip to content

Commit

Permalink
Proof of concept parallel source stream reading implementation for My…
Browse files Browse the repository at this point in the history
…SQL (#26580)

* Proof of concept parallel source stream reading implementation for MySQL

* Automated Change

* Add read method that supports concurrent execution to Source interface

* Remove parallel iterator

* Ensure that executor service is stopped

* Automated Commit - Format and Process Resources Changes

* Expose method to fix compilation issue

* Use concurrent map to avoid access issues

* Automated Commit - Format and Process Resources Changes

* Ensure concurrent streams finish before closing source

* Fix compile issue

* Formatting

* Exclude concurrent stream threads from orphan thread watcher

* Automated Commit - Format and Process Resources Changes

* Refactor orphaned thread logic to account for concurrent execution

* PR feedback

* Implement readStreams in wrapper source

* Automated Commit - Format and Process Resources Changes

* Add readStream override

* Automated Commit - Format and Process Resources Changes

* 🤖 Auto format source-mysql code [skip ci]

* 🤖 Auto format source-mysql code [skip ci]

* 🤖 Auto format source-mysql code [skip ci]

* 🤖 Auto format source-mysql code [skip ci]

* 🤖 Auto format source-mysql code [skip ci]

* Debug logging

* Reduce logging level

* Replace synchronized calls to System.out.println when concurrent

* Close consumer

* Flush before close

* Automated Commit - Format and Process Resources Changes

* Remove charset

* Use ASCII and flush periodically for parallel streams

* Test performance harness patch

* Automated Commit - Format and Process Resources Changes

* Cleanup

* Logging to identify concurrent read enabled

* Mark parameter as final

---------

Co-authored-by: jdpgrailsdev <jdpgrailsdev@users.noreply.github.com>
Co-authored-by: octavia-squidington-iii <octavia-squidington-iii@users.noreply.github.com>
Co-authored-by: Rodi Reich Zilberman <867491+rodireich@users.noreply.github.com>
Co-authored-by: rodireich <rodireich@users.noreply.github.com>
  • Loading branch information
5 people authored Aug 3, 2023
1 parent 2f7deae commit 549e36f
Show file tree
Hide file tree
Showing 22 changed files with 1,391 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,11 @@ public class EnvVariableFeatureFlags implements FeatureFlags {
// Set this value to true to see all messages from the source to destination, set to one second
// emission
public static final String LOG_CONNECTOR_MESSAGES = "LOG_CONNECTOR_MESSAGES";
public static final String NEED_STATE_VALIDATION = "NEED_STATE_VALIDATION";
public static final String APPLY_FIELD_SELECTION = "APPLY_FIELD_SELECTION";

public static final String FIELD_SELECTION_WORKSPACES = "FIELD_SELECTION_WORKSPACES";

@Override
public boolean autoDisablesFailingConnections() {
log.info("Auto Disable Failing Connections: " + Boolean.parseBoolean(System.getenv("AUTO_DISABLE_FAILING_CONNECTIONS")));

return Boolean.parseBoolean(System.getenv("AUTO_DISABLE_FAILING_CONNECTIONS"));
}

@Override
public boolean forceSecretMigration() {
return Boolean.parseBoolean(System.getenv("FORCE_MIGRATE_SECRET_STORE"));
}
public static final String CONCURRENT_SOURCE_STREAM_READ = "CONCURRENT_SOURCE_STREAM_READ";
public static final String STRICT_COMPARISON_NORMALIZATION_WORKSPACES = "STRICT_COMPARISON_NORMALIZATION_WORKSPACES";
public static final String STRICT_COMPARISON_NORMALIZATION_TAG = "STRICT_COMPARISON_NORMALIZATION_TAG";

@Override
public boolean useStreamCapableState() {
Expand All @@ -50,8 +39,8 @@ public boolean logConnectorMessages() {
}

@Override
public boolean needStateValidation() {
return getEnvOrDefault(NEED_STATE_VALIDATION, true, Boolean::parseBoolean);
public boolean concurrentSourceStreamRead() {
return getEnvOrDefault(CONCURRENT_SOURCE_STREAM_READ, false, Boolean::parseBoolean);
}

@Override
Expand All @@ -66,12 +55,12 @@ public String fieldSelectionWorkspaces() {

@Override
public String strictComparisonNormalizationWorkspaces() {
return "";
return getEnvOrDefault(STRICT_COMPARISON_NORMALIZATION_WORKSPACES, "", (arg) -> arg);
}

@Override
public String strictComparisonNormalizationTag() {
return "";
return getEnvOrDefault(STRICT_COMPARISON_NORMALIZATION_TAG, "strict_comparison2", (arg) -> arg);
}

// TODO: refactor in order to use the same method than the ones in EnvConfigs.java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,13 @@
*/
public interface FeatureFlags {

boolean autoDisablesFailingConnections();

boolean forceSecretMigration();

boolean useStreamCapableState();

boolean autoDetectSchema();

boolean logConnectorMessages();

boolean needStateValidation();
boolean concurrentSourceStreamRead();

/**
* Return true if field selection should be applied. See also fieldSelectionWorkspaces.
Expand All @@ -39,7 +35,7 @@ public interface FeatureFlags {

/**
* Get the workspaces allow-listed for strict incremental comparison in normalization. This takes
* precedence over the normalization version in oss_registry.json .
* precedence over the normalization version in destination_definitions.yaml.
*
* @return a comma-separated list of workspace ids where strict incremental comparison should be
* enabled in normalization.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.stream;

import io.airbyte.commons.util.AirbyteStreamAware;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus;
import java.util.Optional;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Collection of utility methods that support the generation of stream status updates.
*/
public class StreamStatusUtils {

private static final Logger LOGGER = LoggerFactory.getLogger(StreamStatusUtils.class);

/**
* Creates a new {@link Consumer} that wraps the provided {@link Consumer} with stream status
* reporting capabilities. Specifically, this consumer will emit an
* {@link AirbyteStreamStatus#RUNNING} status after the first message is consumed by the delegated
* {@link Consumer}.
*
* @param stream The stream from which the delegating {@link Consumer} will consume messages for
* processing.
* @param delegateRecordCollector The delegated {@link Consumer} that will be called when this
* consumer accepts a message for processing.
* @param streamStatusEmitter The optional {@link Consumer} that will be used to emit stream status
* updates.
* @return A wrapping {@link Consumer} that provides stream status updates when the provided
* delegate {@link Consumer} is invoked.
*/
public static Consumer<AirbyteMessage> statusTrackingRecordCollector(final AutoCloseableIterator<AirbyteMessage> stream,
final Consumer<AirbyteMessage> delegateRecordCollector,
final Optional<Consumer<AirbyteStreamStatusHolder>> streamStatusEmitter) {
return new Consumer<>() {

private boolean firstRead = true;

@Override
public void accept(final AirbyteMessage airbyteMessage) {
try {
delegateRecordCollector.accept(airbyteMessage);
} finally {
if (firstRead) {
emitRunningStreamStatus(stream, streamStatusEmitter);
firstRead = false;
}
}
}

};
}

/**
* Emits a {@link AirbyteStreamStatus#RUNNING} stream status for the provided stream.
*
* @param airbyteStream The stream that should be associated with the stream status.
* @param statusEmitter The {@link Optional} stream status emitter.
*/
public static void emitRunningStreamStatus(final AutoCloseableIterator<AirbyteMessage> airbyteStream,
final Optional<Consumer<AirbyteStreamStatusHolder>> statusEmitter) {
if (airbyteStream instanceof AirbyteStreamAware) {
emitRunningStreamStatus((AirbyteStreamAware) airbyteStream, statusEmitter);
}
}

/**
* Emits a {@link AirbyteStreamStatus#RUNNING} stream status for the provided stream.
*
* @param airbyteStream The stream that should be associated with the stream status.
* @param statusEmitter The {@link Optional} stream status emitter.
*/
public static void emitRunningStreamStatus(final AirbyteStreamAware airbyteStream,
final Optional<Consumer<AirbyteStreamStatusHolder>> statusEmitter) {
emitRunningStreamStatus(airbyteStream.getAirbyteStream(), statusEmitter);
}

/**
* Emits a {@link AirbyteStreamStatus#RUNNING} stream status for the provided stream.
*
* @param airbyteStream The stream that should be associated with the stream status.
* @param statusEmitter The {@link Optional} stream status emitter.
*/
public static void emitRunningStreamStatus(final Optional<AirbyteStreamNameNamespacePair> airbyteStream,
final Optional<Consumer<AirbyteStreamStatusHolder>> statusEmitter) {
airbyteStream.ifPresent(s -> {
LOGGER.debug("RUNNING -> {}", s);
emitStreamStatus(s, AirbyteStreamStatus.RUNNING, statusEmitter);
});
}

/**
* Emits a {@link AirbyteStreamStatus#STARTED} stream status for the provided stream.
*
* @param airbyteStream The stream that should be associated with the stream status.
* @param statusEmitter The {@link Optional} stream status emitter.
*/
public static void emitStartStreamStatus(final AutoCloseableIterator<AirbyteMessage> airbyteStream,
final Optional<Consumer<AirbyteStreamStatusHolder>> statusEmitter) {
if (airbyteStream instanceof AirbyteStreamAware) {
emitStartStreamStatus((AirbyteStreamAware) airbyteStream, statusEmitter);
}
}

/**
* Emits a {@link AirbyteStreamStatus#STARTED} stream status for the provided stream.
*
* @param airbyteStream The stream that should be associated with the stream status.
* @param statusEmitter The {@link Optional} stream status emitter.
*/
public static void emitStartStreamStatus(final AirbyteStreamAware airbyteStream,
final Optional<Consumer<AirbyteStreamStatusHolder>> statusEmitter) {
emitStartStreamStatus(airbyteStream.getAirbyteStream(), statusEmitter);
}

/**
* Emits a {@link AirbyteStreamStatus#STARTED} stream status for the provided stream.
*
* @param airbyteStream The stream that should be associated with the stream status.
* @param statusEmitter The {@link Optional} stream status emitter.
*/
public static void emitStartStreamStatus(final Optional<AirbyteStreamNameNamespacePair> airbyteStream,
final Optional<Consumer<AirbyteStreamStatusHolder>> statusEmitter) {
airbyteStream.ifPresent(s -> {
LOGGER.debug("STARTING -> {}", s);
emitStreamStatus(s, AirbyteStreamStatus.STARTED, statusEmitter);
});
}

/**
* Emits a {@link AirbyteStreamStatus#COMPLETE} stream status for the provided stream.
*
* @param airbyteStream The stream that should be associated with the stream status.
* @param statusEmitter The {@link Optional} stream status emitter.
*/
public static void emitCompleteStreamStatus(final AutoCloseableIterator<AirbyteMessage> airbyteStream,
final Optional<Consumer<AirbyteStreamStatusHolder>> statusEmitter) {
if (airbyteStream instanceof AirbyteStreamAware) {
emitCompleteStreamStatus((AirbyteStreamAware) airbyteStream, statusEmitter);
}
}

/**
* Emits a {@link AirbyteStreamStatus#COMPLETE} stream status for the provided stream.
*
* @param airbyteStream The stream that should be associated with the stream status.
* @param statusEmitter The {@link Optional} stream status emitter.
*/
public static void emitCompleteStreamStatus(final AirbyteStreamAware airbyteStream,
final Optional<Consumer<AirbyteStreamStatusHolder>> statusEmitter) {
emitCompleteStreamStatus(airbyteStream.getAirbyteStream(), statusEmitter);
}

/**
* Emits a {@link AirbyteStreamStatus#COMPLETE} stream status for the provided stream.
*
* @param airbyteStream The stream that should be associated with the stream status.
* @param statusEmitter The {@link Optional} stream status emitter.
*/
public static void emitCompleteStreamStatus(final Optional<AirbyteStreamNameNamespacePair> airbyteStream,
final Optional<Consumer<AirbyteStreamStatusHolder>> statusEmitter) {
airbyteStream.ifPresent(s -> {
LOGGER.debug("COMPLETE -> {}", s);
emitStreamStatus(s, AirbyteStreamStatus.COMPLETE, statusEmitter);
});
}

/**
* Emits a {@link AirbyteStreamStatus#INCOMPLETE} stream status for the provided stream.
*
* @param airbyteStream The stream that should be associated with the stream status.
* @param statusEmitter The {@link Optional} stream status emitter.
*/
public static void emitIncompleteStreamStatus(final AutoCloseableIterator<AirbyteMessage> airbyteStream,
final Optional<Consumer<AirbyteStreamStatusHolder>> statusEmitter) {
if (airbyteStream instanceof AirbyteStreamAware) {
emitIncompleteStreamStatus((AirbyteStreamAware) airbyteStream, statusEmitter);
}
}

/**
* Emits a {@link AirbyteStreamStatus#INCOMPLETE} stream status for the provided stream.
*
* @param airbyteStream The stream that should be associated with the stream status.
* @param statusEmitter The {@link Optional} stream status emitter.
*/
public static void emitIncompleteStreamStatus(final AirbyteStreamAware airbyteStream,
final Optional<Consumer<AirbyteStreamStatusHolder>> statusEmitter) {
emitIncompleteStreamStatus(airbyteStream.getAirbyteStream(), statusEmitter);
}

/**
* Emits a {@link AirbyteStreamStatus#INCOMPLETE} stream status for the provided stream.
*
* @param airbyteStream The stream that should be associated with the stream status.
* @param statusEmitter The {@link Optional} stream status emitter.
*/
public static void emitIncompleteStreamStatus(final Optional<AirbyteStreamNameNamespacePair> airbyteStream,
final Optional<Consumer<AirbyteStreamStatusHolder>> statusEmitter) {
airbyteStream.ifPresent(s -> {
LOGGER.debug("INCOMPLETE -> {}", s);
emitStreamStatus(s, AirbyteStreamStatus.INCOMPLETE, statusEmitter);
});
}

/**
* Emits a stream status for the provided stream.
*
* @param airbyteStreamNameNamespacePair The stream identifier.
* @param airbyteStreamStatus The status update.
* @param statusEmitter The {@link Optional} stream status emitter.
*/
private static void emitStreamStatus(final AirbyteStreamNameNamespacePair airbyteStreamNameNamespacePair,
final AirbyteStreamStatus airbyteStreamStatus,
final Optional<Consumer<AirbyteStreamStatusHolder>> statusEmitter) {
statusEmitter.ifPresent(consumer -> consumer.accept(new AirbyteStreamStatusHolder(airbyteStreamNameNamespacePair, airbyteStreamStatus)));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
*
* @param <T> type
*/
public interface AutoCloseableIterator<T> extends Iterator<T>, AutoCloseable {}
public interface AutoCloseableIterator<T> extends Iterator<T>, AutoCloseable, AirbyteStreamAware {}
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,18 @@ public static <T> CompositeIterator<T> concatWithEagerClose(final AutoCloseableI
return concatWithEagerClose(List.of(iterators), null);
}

/**
* Creates a {@link CompositeIterator} that reads from the provided iterators in a serial fashion.
*
* @param iterators The list of iterators to be used in a serial fashion.
* @param airbyteStreamStatusConsumer The stream status consumer used to report stream status during
* iteration.
* @return A {@link CompositeIterator}.
* @param <T> The type of data contained in each iterator.
*/
public static <T> CompositeIterator<T> concatWithEagerClose(final List<AutoCloseableIterator<T>> iterators,
final Consumer<AirbyteStreamStatusHolder> airbyteStreamStatusConsumer) {
return new CompositeIterator<>(iterators, airbyteStreamStatusConsumer);
}

public static <T> CompositeIterator<T> concatWithEagerClose(final List<AutoCloseableIterator<T>> iterators) {
return concatWithEagerClose(iterators, null);
}

}
Loading

0 comments on commit 549e36f

Please sign in to comment.