From 549e36f156788158a8b0180330fa55e31c134f4e Mon Sep 17 00:00:00 2001 From: Jonathan Pearlin Date: Thu, 3 Aug 2023 14:23:52 -0400 Subject: [PATCH] Proof of concept parallel source stream reading implementation for MySQL (#26580) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Proof of concept parallel source stream reading implementation for MySQL * Automated Change * Add read method that supports concurrent execution to Source interface * Remove parallel iterator * Ensure that executor service is stopped * Automated Commit - Format and Process Resources Changes * Expose method to fix compilation issue * Use concurrent map to avoid access issues * Automated Commit - Format and Process Resources Changes * Ensure concurrent streams finish before closing source * Fix compile issue * Formatting * Exclude concurrent stream threads from orphan thread watcher * Automated Commit - Format and Process Resources Changes * Refactor orphaned thread logic to account for concurrent execution * PR feedback * Implement readStreams in wrapper source * Automated Commit - Format and Process Resources Changes * Add readStream override * Automated Commit - Format and Process Resources Changes * 🤖 Auto format source-mysql code [skip ci] * 🤖 Auto format source-mysql code [skip ci] * 🤖 Auto format source-mysql code [skip ci] * 🤖 Auto format source-mysql code [skip ci] * 🤖 Auto format source-mysql code [skip ci] * Debug logging * Reduce logging level * Replace synchronized calls to System.out.println when concurrent * Close consumer * Flush before close * Automated Commit - Format and Process Resources Changes * Remove charset * Use ASCII and flush periodically for parallel streams * Test performance harness patch * Automated Commit - Format and Process Resources Changes * Cleanup * Logging to identify concurrent read enabled * Mark parameter as final --------- Co-authored-by: jdpgrailsdev Co-authored-by: octavia-squidington-iii Co-authored-by: Rodi Reich Zilberman <867491+rodireich@users.noreply.github.com> Co-authored-by: rodireich --- .../features/EnvVariableFeatureFlags.java | 25 +- .../commons/features/FeatureFlags.java | 8 +- .../commons/stream/StreamStatusUtils.java | 226 ++++++++ .../commons/util/AutoCloseableIterator.java | 2 +- .../commons/util/AutoCloseableIterators.java | 13 +- .../commons/util/CompositeIterator.java | 49 +- .../util/DefaultAutoCloseableIterator.java | 2 +- .../util/LazyAutoCloseableIterator.java | 2 +- .../commons/stream/StreamStatusUtilsTest.java | 498 ++++++++++++++++++ .../util/AutoCloseableIteratorsTest.java | 11 +- .../integrations/base/IntegrationRunner.java | 215 +++++--- .../io/airbyte/integrations/base/Source.java | 19 + .../SpecModifyingSource.java | 7 + .../base/ssh/SshWrappedSource.java | 14 + .../concurrent/ConcurrentStreamConsumer.java | 237 +++++++++ .../base/IntegrationRunnerTest.java | 36 +- .../ConcurrentStreamConsumerTest.java | 126 +++++ .../source/jdbc/AbstractJdbcSource.java | 3 +- .../connectors/source-mysql/build.gradle | 4 + .../source/mysql/MySqlSource.java | 46 ++ .../source/relationaldb/AbstractDbSource.java | 23 +- .../relationaldb/state/CursorManager.java | 3 +- 22 files changed, 1391 insertions(+), 178 deletions(-) create mode 100644 airbyte-commons/src/main/java/io/airbyte/commons/stream/StreamStatusUtils.java create mode 100644 airbyte-commons/src/test/java/io/airbyte/commons/stream/StreamStatusUtilsTest.java create mode 100644 airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/util/concurrent/ConcurrentStreamConsumer.java create mode 100644 airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/util/concurrent/ConcurrentStreamConsumerTest.java diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java index 879e0010595e..9b64edc1f99c 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java @@ -17,22 +17,11 @@ public class EnvVariableFeatureFlags implements FeatureFlags { // Set this value to true to see all messages from the source to destination, set to one second // emission public static final String LOG_CONNECTOR_MESSAGES = "LOG_CONNECTOR_MESSAGES"; - public static final String NEED_STATE_VALIDATION = "NEED_STATE_VALIDATION"; public static final String APPLY_FIELD_SELECTION = "APPLY_FIELD_SELECTION"; - public static final String FIELD_SELECTION_WORKSPACES = "FIELD_SELECTION_WORKSPACES"; - - @Override - public boolean autoDisablesFailingConnections() { - log.info("Auto Disable Failing Connections: " + Boolean.parseBoolean(System.getenv("AUTO_DISABLE_FAILING_CONNECTIONS"))); - - return Boolean.parseBoolean(System.getenv("AUTO_DISABLE_FAILING_CONNECTIONS")); - } - - @Override - public boolean forceSecretMigration() { - return Boolean.parseBoolean(System.getenv("FORCE_MIGRATE_SECRET_STORE")); - } + public static final String CONCURRENT_SOURCE_STREAM_READ = "CONCURRENT_SOURCE_STREAM_READ"; + public static final String STRICT_COMPARISON_NORMALIZATION_WORKSPACES = "STRICT_COMPARISON_NORMALIZATION_WORKSPACES"; + public static final String STRICT_COMPARISON_NORMALIZATION_TAG = "STRICT_COMPARISON_NORMALIZATION_TAG"; @Override public boolean useStreamCapableState() { @@ -50,8 +39,8 @@ public boolean logConnectorMessages() { } @Override - public boolean needStateValidation() { - return getEnvOrDefault(NEED_STATE_VALIDATION, true, Boolean::parseBoolean); + public boolean concurrentSourceStreamRead() { + return getEnvOrDefault(CONCURRENT_SOURCE_STREAM_READ, false, Boolean::parseBoolean); } @Override @@ -66,12 +55,12 @@ public String fieldSelectionWorkspaces() { @Override public String strictComparisonNormalizationWorkspaces() { - return ""; + return getEnvOrDefault(STRICT_COMPARISON_NORMALIZATION_WORKSPACES, "", (arg) -> arg); } @Override public String strictComparisonNormalizationTag() { - return ""; + return getEnvOrDefault(STRICT_COMPARISON_NORMALIZATION_TAG, "strict_comparison2", (arg) -> arg); } // TODO: refactor in order to use the same method than the ones in EnvConfigs.java diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java index f03dc46d8dd2..b3da9ac764bb 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/FeatureFlags.java @@ -10,17 +10,13 @@ */ public interface FeatureFlags { - boolean autoDisablesFailingConnections(); - - boolean forceSecretMigration(); - boolean useStreamCapableState(); boolean autoDetectSchema(); boolean logConnectorMessages(); - boolean needStateValidation(); + boolean concurrentSourceStreamRead(); /** * Return true if field selection should be applied. See also fieldSelectionWorkspaces. @@ -39,7 +35,7 @@ public interface FeatureFlags { /** * Get the workspaces allow-listed for strict incremental comparison in normalization. This takes - * precedence over the normalization version in oss_registry.json . + * precedence over the normalization version in destination_definitions.yaml. * * @return a comma-separated list of workspace ids where strict incremental comparison should be * enabled in normalization. diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/stream/StreamStatusUtils.java b/airbyte-commons/src/main/java/io/airbyte/commons/stream/StreamStatusUtils.java new file mode 100644 index 000000000000..f4f748bef993 --- /dev/null +++ b/airbyte-commons/src/main/java/io/airbyte/commons/stream/StreamStatusUtils.java @@ -0,0 +1,226 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.commons.stream; + +import io.airbyte.commons.util.AirbyteStreamAware; +import io.airbyte.commons.util.AutoCloseableIterator; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.v0.AirbyteMessage; +import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus; +import java.util.Optional; +import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Collection of utility methods that support the generation of stream status updates. + */ +public class StreamStatusUtils { + + private static final Logger LOGGER = LoggerFactory.getLogger(StreamStatusUtils.class); + + /** + * Creates a new {@link Consumer} that wraps the provided {@link Consumer} with stream status + * reporting capabilities. Specifically, this consumer will emit an + * {@link AirbyteStreamStatus#RUNNING} status after the first message is consumed by the delegated + * {@link Consumer}. + * + * @param stream The stream from which the delegating {@link Consumer} will consume messages for + * processing. + * @param delegateRecordCollector The delegated {@link Consumer} that will be called when this + * consumer accepts a message for processing. + * @param streamStatusEmitter The optional {@link Consumer} that will be used to emit stream status + * updates. + * @return A wrapping {@link Consumer} that provides stream status updates when the provided + * delegate {@link Consumer} is invoked. + */ + public static Consumer statusTrackingRecordCollector(final AutoCloseableIterator stream, + final Consumer delegateRecordCollector, + final Optional> streamStatusEmitter) { + return new Consumer<>() { + + private boolean firstRead = true; + + @Override + public void accept(final AirbyteMessage airbyteMessage) { + try { + delegateRecordCollector.accept(airbyteMessage); + } finally { + if (firstRead) { + emitRunningStreamStatus(stream, streamStatusEmitter); + firstRead = false; + } + } + } + + }; + } + + /** + * Emits a {@link AirbyteStreamStatus#RUNNING} stream status for the provided stream. + * + * @param airbyteStream The stream that should be associated with the stream status. + * @param statusEmitter The {@link Optional} stream status emitter. + */ + public static void emitRunningStreamStatus(final AutoCloseableIterator airbyteStream, + final Optional> statusEmitter) { + if (airbyteStream instanceof AirbyteStreamAware) { + emitRunningStreamStatus((AirbyteStreamAware) airbyteStream, statusEmitter); + } + } + + /** + * Emits a {@link AirbyteStreamStatus#RUNNING} stream status for the provided stream. + * + * @param airbyteStream The stream that should be associated with the stream status. + * @param statusEmitter The {@link Optional} stream status emitter. + */ + public static void emitRunningStreamStatus(final AirbyteStreamAware airbyteStream, + final Optional> statusEmitter) { + emitRunningStreamStatus(airbyteStream.getAirbyteStream(), statusEmitter); + } + + /** + * Emits a {@link AirbyteStreamStatus#RUNNING} stream status for the provided stream. + * + * @param airbyteStream The stream that should be associated with the stream status. + * @param statusEmitter The {@link Optional} stream status emitter. + */ + public static void emitRunningStreamStatus(final Optional airbyteStream, + final Optional> statusEmitter) { + airbyteStream.ifPresent(s -> { + LOGGER.debug("RUNNING -> {}", s); + emitStreamStatus(s, AirbyteStreamStatus.RUNNING, statusEmitter); + }); + } + + /** + * Emits a {@link AirbyteStreamStatus#STARTED} stream status for the provided stream. + * + * @param airbyteStream The stream that should be associated with the stream status. + * @param statusEmitter The {@link Optional} stream status emitter. + */ + public static void emitStartStreamStatus(final AutoCloseableIterator airbyteStream, + final Optional> statusEmitter) { + if (airbyteStream instanceof AirbyteStreamAware) { + emitStartStreamStatus((AirbyteStreamAware) airbyteStream, statusEmitter); + } + } + + /** + * Emits a {@link AirbyteStreamStatus#STARTED} stream status for the provided stream. + * + * @param airbyteStream The stream that should be associated with the stream status. + * @param statusEmitter The {@link Optional} stream status emitter. + */ + public static void emitStartStreamStatus(final AirbyteStreamAware airbyteStream, + final Optional> statusEmitter) { + emitStartStreamStatus(airbyteStream.getAirbyteStream(), statusEmitter); + } + + /** + * Emits a {@link AirbyteStreamStatus#STARTED} stream status for the provided stream. + * + * @param airbyteStream The stream that should be associated with the stream status. + * @param statusEmitter The {@link Optional} stream status emitter. + */ + public static void emitStartStreamStatus(final Optional airbyteStream, + final Optional> statusEmitter) { + airbyteStream.ifPresent(s -> { + LOGGER.debug("STARTING -> {}", s); + emitStreamStatus(s, AirbyteStreamStatus.STARTED, statusEmitter); + }); + } + + /** + * Emits a {@link AirbyteStreamStatus#COMPLETE} stream status for the provided stream. + * + * @param airbyteStream The stream that should be associated with the stream status. + * @param statusEmitter The {@link Optional} stream status emitter. + */ + public static void emitCompleteStreamStatus(final AutoCloseableIterator airbyteStream, + final Optional> statusEmitter) { + if (airbyteStream instanceof AirbyteStreamAware) { + emitCompleteStreamStatus((AirbyteStreamAware) airbyteStream, statusEmitter); + } + } + + /** + * Emits a {@link AirbyteStreamStatus#COMPLETE} stream status for the provided stream. + * + * @param airbyteStream The stream that should be associated with the stream status. + * @param statusEmitter The {@link Optional} stream status emitter. + */ + public static void emitCompleteStreamStatus(final AirbyteStreamAware airbyteStream, + final Optional> statusEmitter) { + emitCompleteStreamStatus(airbyteStream.getAirbyteStream(), statusEmitter); + } + + /** + * Emits a {@link AirbyteStreamStatus#COMPLETE} stream status for the provided stream. + * + * @param airbyteStream The stream that should be associated with the stream status. + * @param statusEmitter The {@link Optional} stream status emitter. + */ + public static void emitCompleteStreamStatus(final Optional airbyteStream, + final Optional> statusEmitter) { + airbyteStream.ifPresent(s -> { + LOGGER.debug("COMPLETE -> {}", s); + emitStreamStatus(s, AirbyteStreamStatus.COMPLETE, statusEmitter); + }); + } + + /** + * Emits a {@link AirbyteStreamStatus#INCOMPLETE} stream status for the provided stream. + * + * @param airbyteStream The stream that should be associated with the stream status. + * @param statusEmitter The {@link Optional} stream status emitter. + */ + public static void emitIncompleteStreamStatus(final AutoCloseableIterator airbyteStream, + final Optional> statusEmitter) { + if (airbyteStream instanceof AirbyteStreamAware) { + emitIncompleteStreamStatus((AirbyteStreamAware) airbyteStream, statusEmitter); + } + } + + /** + * Emits a {@link AirbyteStreamStatus#INCOMPLETE} stream status for the provided stream. + * + * @param airbyteStream The stream that should be associated with the stream status. + * @param statusEmitter The {@link Optional} stream status emitter. + */ + public static void emitIncompleteStreamStatus(final AirbyteStreamAware airbyteStream, + final Optional> statusEmitter) { + emitIncompleteStreamStatus(airbyteStream.getAirbyteStream(), statusEmitter); + } + + /** + * Emits a {@link AirbyteStreamStatus#INCOMPLETE} stream status for the provided stream. + * + * @param airbyteStream The stream that should be associated with the stream status. + * @param statusEmitter The {@link Optional} stream status emitter. + */ + public static void emitIncompleteStreamStatus(final Optional airbyteStream, + final Optional> statusEmitter) { + airbyteStream.ifPresent(s -> { + LOGGER.debug("INCOMPLETE -> {}", s); + emitStreamStatus(s, AirbyteStreamStatus.INCOMPLETE, statusEmitter); + }); + } + + /** + * Emits a stream status for the provided stream. + * + * @param airbyteStreamNameNamespacePair The stream identifier. + * @param airbyteStreamStatus The status update. + * @param statusEmitter The {@link Optional} stream status emitter. + */ + private static void emitStreamStatus(final AirbyteStreamNameNamespacePair airbyteStreamNameNamespacePair, + final AirbyteStreamStatus airbyteStreamStatus, + final Optional> statusEmitter) { + statusEmitter.ifPresent(consumer -> consumer.accept(new AirbyteStreamStatusHolder(airbyteStreamNameNamespacePair, airbyteStreamStatus))); + } + +} diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/util/AutoCloseableIterator.java b/airbyte-commons/src/main/java/io/airbyte/commons/util/AutoCloseableIterator.java index 06949c479fb2..ccbc11e10a11 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/util/AutoCloseableIterator.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/util/AutoCloseableIterator.java @@ -12,4 +12,4 @@ * * @param type */ -public interface AutoCloseableIterator extends Iterator, AutoCloseable {} +public interface AutoCloseableIterator extends Iterator, AutoCloseable, AirbyteStreamAware {} diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/util/AutoCloseableIterators.java b/airbyte-commons/src/main/java/io/airbyte/commons/util/AutoCloseableIterators.java index 3011704ffbcb..e5d304b5edca 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/util/AutoCloseableIterators.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/util/AutoCloseableIterators.java @@ -191,13 +191,18 @@ public static CompositeIterator concatWithEagerClose(final AutoCloseableI return concatWithEagerClose(List.of(iterators), null); } + /** + * Creates a {@link CompositeIterator} that reads from the provided iterators in a serial fashion. + * + * @param iterators The list of iterators to be used in a serial fashion. + * @param airbyteStreamStatusConsumer The stream status consumer used to report stream status during + * iteration. + * @return A {@link CompositeIterator}. + * @param The type of data contained in each iterator. + */ public static CompositeIterator concatWithEagerClose(final List> iterators, final Consumer airbyteStreamStatusConsumer) { return new CompositeIterator<>(iterators, airbyteStreamStatusConsumer); } - public static CompositeIterator concatWithEagerClose(final List> iterators) { - return concatWithEagerClose(iterators, null); - } - } diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/util/CompositeIterator.java b/airbyte-commons/src/main/java/io/airbyte/commons/util/CompositeIterator.java index 2f92c1b68e92..7c5997d344c6 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/util/CompositeIterator.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/util/CompositeIterator.java @@ -7,8 +7,8 @@ import com.google.common.base.Preconditions; import com.google.common.collect.AbstractIterator; import io.airbyte.commons.stream.AirbyteStreamStatusHolder; +import io.airbyte.commons.stream.StreamStatusUtils; import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; -import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -37,7 +37,7 @@ * * @param type */ -public final class CompositeIterator extends AbstractIterator implements AutoCloseableIterator, AirbyteStreamAware { +public final class CompositeIterator extends AbstractIterator implements AutoCloseableIterator { private static final Logger LOGGER = LoggerFactory.getLogger(CompositeIterator.class); @@ -72,15 +72,15 @@ protected T computeNext() { while (!currentIterator().hasNext()) { try { currentIterator().close(); - emitCompleteStreamStatus(getAirbyteStream()); + StreamStatusUtils.emitCompleteStreamStatus(getAirbyteStream(), airbyteStreamStatusConsumer); } catch (final Exception e) { - emitIncompleteStreamStatus(getAirbyteStream()); + StreamStatusUtils.emitIncompleteStreamStatus(getAirbyteStream(), airbyteStreamStatusConsumer); throw new RuntimeException(e); } if (i + 1 < iterators.size()) { i++; - emitStartStreamStatus(getAirbyteStream()); + StreamStatusUtils.emitStartStreamStatus(getAirbyteStream(), airbyteStreamStatusConsumer); firstRead = true; } else { return endOfData(); @@ -89,15 +89,15 @@ protected T computeNext() { try { if (isFirstStream()) { - emitStartStreamStatus(getAirbyteStream()); + StreamStatusUtils.emitStartStreamStatus(getAirbyteStream(), airbyteStreamStatusConsumer); } return currentIterator().next(); } catch (final RuntimeException e) { - emitIncompleteStreamStatus(getAirbyteStream()); + StreamStatusUtils.emitIncompleteStreamStatus(getAirbyteStream(), airbyteStreamStatusConsumer); throw e; } finally { if (firstRead) { - emitRunningStreamStatus(getAirbyteStream()); + StreamStatusUtils.emitRunningStreamStatus(getAirbyteStream(), airbyteStreamStatusConsumer); firstRead = false; } } @@ -143,37 +143,4 @@ private void assertHasNotClosed() { Preconditions.checkState(!hasClosed); } - private void emitRunningStreamStatus(final Optional airbyteStream) { - airbyteStream.ifPresent(s -> { - LOGGER.info("RUNNING -> {}", s); - emitStreamStatus(s, AirbyteStreamStatus.RUNNING); - }); - } - - private void emitStartStreamStatus(final Optional airbyteStream) { - airbyteStream.ifPresent(s -> { - LOGGER.info("STARTING -> {}", s); - emitStreamStatus(s, AirbyteStreamStatus.STARTED); - }); - } - - private void emitCompleteStreamStatus(final Optional airbyteStream) { - airbyteStream.ifPresent(s -> { - LOGGER.info("COMPLETE -> {}", s); - emitStreamStatus(s, AirbyteStreamStatus.COMPLETE); - }); - } - - private void emitIncompleteStreamStatus(final Optional airbyteStream) { - airbyteStream.ifPresent(s -> { - LOGGER.info("COMPLETE -> {}", s); - emitStreamStatus(s, AirbyteStreamStatus.INCOMPLETE); - }); - } - - private void emitStreamStatus(final AirbyteStreamNameNamespacePair airbyteStreamNameNamespacePair, - final AirbyteStreamStatus airbyteStreamStatus) { - airbyteStreamStatusConsumer.ifPresent(c -> c.accept(new AirbyteStreamStatusHolder(airbyteStreamNameNamespacePair, airbyteStreamStatus))); - } - } diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/util/DefaultAutoCloseableIterator.java b/airbyte-commons/src/main/java/io/airbyte/commons/util/DefaultAutoCloseableIterator.java index e6051910c023..effd09566e37 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/util/DefaultAutoCloseableIterator.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/util/DefaultAutoCloseableIterator.java @@ -17,7 +17,7 @@ * * @param type */ -class DefaultAutoCloseableIterator extends AbstractIterator implements AutoCloseableIterator, AirbyteStreamAware { +class DefaultAutoCloseableIterator extends AbstractIterator implements AutoCloseableIterator { private final AirbyteStreamNameNamespacePair airbyteStream; private final Iterator iterator; diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/util/LazyAutoCloseableIterator.java b/airbyte-commons/src/main/java/io/airbyte/commons/util/LazyAutoCloseableIterator.java index 5479e63ac333..77fcbeb51308 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/util/LazyAutoCloseableIterator.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/util/LazyAutoCloseableIterator.java @@ -20,7 +20,7 @@ * * @param type */ -class LazyAutoCloseableIterator extends AbstractIterator implements AutoCloseableIterator, AirbyteStreamAware { +class LazyAutoCloseableIterator extends AbstractIterator implements AutoCloseableIterator { private final Supplier> iteratorSupplier; diff --git a/airbyte-commons/src/test/java/io/airbyte/commons/stream/StreamStatusUtilsTest.java b/airbyte-commons/src/test/java/io/airbyte/commons/stream/StreamStatusUtilsTest.java new file mode 100644 index 000000000000..5ddbbd2ed288 --- /dev/null +++ b/airbyte-commons/src/test/java/io/airbyte/commons/stream/StreamStatusUtilsTest.java @@ -0,0 +1,498 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.commons.stream; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.airbyte.commons.util.AirbyteStreamAware; +import io.airbyte.commons.util.AutoCloseableIterator; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.v0.AirbyteMessage; +import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus; +import java.util.Optional; +import java.util.function.Consumer; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.junit.jupiter.MockitoExtension; + +/** + * Test suite for the {@link StreamStatusUtils} class. + */ +@ExtendWith(MockitoExtension.class) +class StreamStatusUtilsTest { + + private static final String NAME = "name"; + private static final String NAMESPACE = "namespace"; + + @Captor + private ArgumentCaptor airbyteStreamStatusHolderArgumentCaptor; + + @Test + void testCreateStreamStatusConsumerWrapper() { + final AutoCloseableIterator stream = mock(AutoCloseableIterator.class); + final Optional> streamStatusEmitter = Optional.empty(); + final Consumer messageConsumer = mock(Consumer.class); + + final Consumer wrappedMessageConsumer = + StreamStatusUtils.statusTrackingRecordCollector(stream, messageConsumer, streamStatusEmitter); + + assertNotEquals(messageConsumer, wrappedMessageConsumer); + } + + @Test + void testStreamStatusConsumerWrapperProduceStreamStatus() { + final AirbyteStreamNameNamespacePair airbyteStream = new AirbyteStreamNameNamespacePair(NAME, NAMESPACE); + final AutoCloseableIterator stream = mock(AutoCloseableIterator.class); + final Consumer statusEmitter = mock(Consumer.class); + final Optional> streamStatusEmitter = Optional.of(statusEmitter); + final Consumer messageConsumer = mock(Consumer.class); + final AirbyteMessage airbyteMessage = mock(AirbyteMessage.class); + + when(stream.getAirbyteStream()).thenReturn(Optional.of(airbyteStream)); + + final Consumer wrappedMessageConsumer = + StreamStatusUtils.statusTrackingRecordCollector(stream, messageConsumer, streamStatusEmitter); + + assertNotEquals(messageConsumer, wrappedMessageConsumer); + + wrappedMessageConsumer.accept(airbyteMessage); + wrappedMessageConsumer.accept(airbyteMessage); + wrappedMessageConsumer.accept(airbyteMessage); + + verify(messageConsumer, times(3)).accept(any()); + verify(statusEmitter, times(1)).accept(airbyteStreamStatusHolderArgumentCaptor.capture()); + assertEquals(AirbyteStreamStatus.RUNNING, airbyteStreamStatusHolderArgumentCaptor.getValue().toTraceMessage().getStreamStatus().getStatus()); + } + + @Test + void testEmitRunningStreamStatusIterator() { + final AirbyteStreamNameNamespacePair airbyteStream = new AirbyteStreamNameNamespacePair(NAME, NAMESPACE); + final AutoCloseableIterator stream = mock(AutoCloseableIterator.class); + final Consumer statusEmitter = mock(Consumer.class); + final Optional> streamStatusEmitter = Optional.of(statusEmitter); + + when(stream.getAirbyteStream()).thenReturn(Optional.of(airbyteStream)); + + StreamStatusUtils.emitRunningStreamStatus(stream, streamStatusEmitter); + + verify(statusEmitter, times(1)).accept(airbyteStreamStatusHolderArgumentCaptor.capture()); + assertEquals(AirbyteStreamStatus.RUNNING, airbyteStreamStatusHolderArgumentCaptor.getValue().toTraceMessage().getStreamStatus().getStatus()); + } + + @Test + void testEmitRunningStreamStatusIteratorEmptyAirbyteStream() { + final AutoCloseableIterator stream = mock(AutoCloseableIterator.class); + final Consumer statusEmitter = mock(Consumer.class); + final Optional> streamStatusEmitter = Optional.of(statusEmitter); + + when(stream.getAirbyteStream()).thenReturn(Optional.empty()); + + assertDoesNotThrow(() -> StreamStatusUtils.emitRunningStreamStatus(stream, streamStatusEmitter)); + verify(statusEmitter, times(0)).accept(airbyteStreamStatusHolderArgumentCaptor.capture()); + } + + @Test + void testEmitRunningStreamStatusIteratorEmptyStatusEmitter() { + final AirbyteStreamNameNamespacePair airbyteStream = new AirbyteStreamNameNamespacePair(NAME, NAMESPACE); + final AutoCloseableIterator stream = mock(AutoCloseableIterator.class); + final Optional> streamStatusEmitter = Optional.empty(); + + when(stream.getAirbyteStream()).thenReturn(Optional.of(airbyteStream)); + + assertDoesNotThrow(() -> StreamStatusUtils.emitRunningStreamStatus(stream, streamStatusEmitter)); + } + + @Test + void testEmitRunningStreamStatusAirbyteStreamAware() { + final AirbyteStreamNameNamespacePair airbyteStream = new AirbyteStreamNameNamespacePair(NAME, NAMESPACE); + final AirbyteStreamAware stream = mock(AirbyteStreamAware.class); + final Consumer statusEmitter = mock(Consumer.class); + final Optional> streamStatusEmitter = Optional.of(statusEmitter); + + when(stream.getAirbyteStream()).thenReturn(Optional.of(airbyteStream)); + + StreamStatusUtils.emitRunningStreamStatus(stream, streamStatusEmitter); + + verify(statusEmitter, times(1)).accept(airbyteStreamStatusHolderArgumentCaptor.capture()); + assertEquals(AirbyteStreamStatus.RUNNING, airbyteStreamStatusHolderArgumentCaptor.getValue().toTraceMessage().getStreamStatus().getStatus()); + } + + @Test + void testEmitRunningStreamStatusAirbyteStreamAwareEmptyStream() { + final AirbyteStreamAware stream = mock(AirbyteStreamAware.class); + final Consumer statusEmitter = mock(Consumer.class); + final Optional> streamStatusEmitter = Optional.of(statusEmitter); + + when(stream.getAirbyteStream()).thenReturn(Optional.empty()); + + assertDoesNotThrow(() -> StreamStatusUtils.emitRunningStreamStatus(stream, streamStatusEmitter)); + verify(statusEmitter, times(0)).accept(airbyteStreamStatusHolderArgumentCaptor.capture()); + } + + @Test + void testEmitRunningStreamStatusAirbyteStreamAwareEmptyStatusEmitter() { + final AirbyteStreamNameNamespacePair airbyteStream = new AirbyteStreamNameNamespacePair(NAME, NAMESPACE); + final AirbyteStreamAware stream = mock(AirbyteStreamAware.class); + final Optional> streamStatusEmitter = Optional.empty(); + + when(stream.getAirbyteStream()).thenReturn(Optional.of(airbyteStream)); + + assertDoesNotThrow(() -> StreamStatusUtils.emitRunningStreamStatus(stream, streamStatusEmitter)); + } + + @Test + void testEmitRunningStreamStatusAirbyteStream() { + final AirbyteStreamNameNamespacePair airbyteStream = new AirbyteStreamNameNamespacePair(NAME, NAMESPACE); + final Consumer statusEmitter = mock(Consumer.class); + final Optional> streamStatusEmitter = Optional.of(statusEmitter); + + StreamStatusUtils.emitRunningStreamStatus(Optional.of(airbyteStream), streamStatusEmitter); + + verify(statusEmitter, times(1)).accept(airbyteStreamStatusHolderArgumentCaptor.capture()); + assertEquals(AirbyteStreamStatus.RUNNING, airbyteStreamStatusHolderArgumentCaptor.getValue().toTraceMessage().getStreamStatus().getStatus()); + } + + @Test + void testEmitRunningStreamStatusEmptyAirbyteStream() { + final Consumer statusEmitter = mock(Consumer.class); + final Optional> streamStatusEmitter = Optional.of(statusEmitter); + + assertDoesNotThrow(() -> StreamStatusUtils.emitRunningStreamStatus(Optional.empty(), streamStatusEmitter)); + verify(statusEmitter, times(0)).accept(airbyteStreamStatusHolderArgumentCaptor.capture()); + } + + @Test + void testEmitRunningStreamStatusAirbyteStreamEmptyStatusEmitter() { + final AirbyteStreamNameNamespacePair airbyteStream = new AirbyteStreamNameNamespacePair(NAME, NAMESPACE); + final Optional> streamStatusEmitter = Optional.empty(); + + assertDoesNotThrow(() -> StreamStatusUtils.emitRunningStreamStatus(Optional.of(airbyteStream), streamStatusEmitter)); + } + + @Test + void testEmitStartedStreamStatusIterator() { + final AirbyteStreamNameNamespacePair airbyteStream = new AirbyteStreamNameNamespacePair(NAME, NAMESPACE); + final AutoCloseableIterator stream = mock(AutoCloseableIterator.class); + final Consumer statusEmitter = mock(Consumer.class); + final Optional> streamStatusEmitter = Optional.of(statusEmitter); + + when(stream.getAirbyteStream()).thenReturn(Optional.of(airbyteStream)); + + StreamStatusUtils.emitStartStreamStatus(stream, streamStatusEmitter); + + verify(statusEmitter, times(1)).accept(airbyteStreamStatusHolderArgumentCaptor.capture()); + assertEquals(AirbyteStreamStatus.STARTED, airbyteStreamStatusHolderArgumentCaptor.getValue().toTraceMessage().getStreamStatus().getStatus()); + } + + @Test + void testEmitStartedStreamStatusIteratorEmptyAirbyteStream() { + final AutoCloseableIterator stream = mock(AutoCloseableIterator.class); + final Consumer statusEmitter = mock(Consumer.class); + final Optional> streamStatusEmitter = Optional.of(statusEmitter); + + when(stream.getAirbyteStream()).thenReturn(Optional.empty()); + + assertDoesNotThrow(() -> StreamStatusUtils.emitStartStreamStatus(stream, streamStatusEmitter)); + verify(statusEmitter, times(0)).accept(airbyteStreamStatusHolderArgumentCaptor.capture()); + } + + @Test + void testEmitStartedStreamStatusIteratorEmptyStatusEmitter() { + final AirbyteStreamNameNamespacePair airbyteStream = new AirbyteStreamNameNamespacePair(NAME, NAMESPACE); + final AutoCloseableIterator stream = mock(AutoCloseableIterator.class); + final Optional> streamStatusEmitter = Optional.empty(); + + when(stream.getAirbyteStream()).thenReturn(Optional.of(airbyteStream)); + + assertDoesNotThrow(() -> StreamStatusUtils.emitStartStreamStatus(stream, streamStatusEmitter)); + } + + @Test + void testEmitStartedStreamStatusAirbyteStreamAware() { + final AirbyteStreamNameNamespacePair airbyteStream = new AirbyteStreamNameNamespacePair(NAME, NAMESPACE); + final AirbyteStreamAware stream = mock(AirbyteStreamAware.class); + final Consumer statusEmitter = mock(Consumer.class); + final Optional> streamStatusEmitter = Optional.of(statusEmitter); + + when(stream.getAirbyteStream()).thenReturn(Optional.of(airbyteStream)); + + StreamStatusUtils.emitStartStreamStatus(stream, streamStatusEmitter); + + verify(statusEmitter, times(1)).accept(airbyteStreamStatusHolderArgumentCaptor.capture()); + assertEquals(AirbyteStreamStatus.STARTED, airbyteStreamStatusHolderArgumentCaptor.getValue().toTraceMessage().getStreamStatus().getStatus()); + } + + @Test + void testEmitStartedStreamStatusAirbyteStreamAwareEmptyStream() { + final AirbyteStreamAware stream = mock(AirbyteStreamAware.class); + final Consumer statusEmitter = mock(Consumer.class); + final Optional> streamStatusEmitter = Optional.of(statusEmitter); + + when(stream.getAirbyteStream()).thenReturn(Optional.empty()); + + assertDoesNotThrow(() -> StreamStatusUtils.emitStartStreamStatus(stream, streamStatusEmitter)); + verify(statusEmitter, times(0)).accept(airbyteStreamStatusHolderArgumentCaptor.capture()); + } + + @Test + void testEmitStartedStreamStatusAirbyteStreamAwareEmptyStatusEmitter() { + final AirbyteStreamNameNamespacePair airbyteStream = new AirbyteStreamNameNamespacePair(NAME, NAMESPACE); + final AirbyteStreamAware stream = mock(AirbyteStreamAware.class); + final Optional> streamStatusEmitter = Optional.empty(); + + when(stream.getAirbyteStream()).thenReturn(Optional.of(airbyteStream)); + + assertDoesNotThrow(() -> StreamStatusUtils.emitStartStreamStatus(stream, streamStatusEmitter)); + } + + @Test + void testEmitStartedStreamStatusAirbyteStream() { + final AirbyteStreamNameNamespacePair airbyteStream = new AirbyteStreamNameNamespacePair(NAME, NAMESPACE); + final Consumer statusEmitter = mock(Consumer.class); + final Optional> streamStatusEmitter = Optional.of(statusEmitter); + + StreamStatusUtils.emitStartStreamStatus(Optional.of(airbyteStream), streamStatusEmitter); + + verify(statusEmitter, times(1)).accept(airbyteStreamStatusHolderArgumentCaptor.capture()); + assertEquals(AirbyteStreamStatus.STARTED, airbyteStreamStatusHolderArgumentCaptor.getValue().toTraceMessage().getStreamStatus().getStatus()); + } + + @Test + void testEmitStartedStreamStatusEmptyAirbyteStream() { + final Consumer statusEmitter = mock(Consumer.class); + final Optional> streamStatusEmitter = Optional.of(statusEmitter); + + assertDoesNotThrow(() -> StreamStatusUtils.emitStartStreamStatus(Optional.empty(), streamStatusEmitter)); + verify(statusEmitter, times(0)).accept(airbyteStreamStatusHolderArgumentCaptor.capture()); + } + + @Test + void testEmitStartedStreamStatusAirbyteStreamEmptyStatusEmitter() { + final AirbyteStreamNameNamespacePair airbyteStream = new AirbyteStreamNameNamespacePair(NAME, NAMESPACE); + final Optional> streamStatusEmitter = Optional.empty(); + + assertDoesNotThrow(() -> StreamStatusUtils.emitStartStreamStatus(Optional.of(airbyteStream), streamStatusEmitter)); + } + + @Test + void testEmitCompleteStreamStatusIterator() { + final AirbyteStreamNameNamespacePair airbyteStream = new AirbyteStreamNameNamespacePair(NAME, NAMESPACE); + final AutoCloseableIterator stream = mock(AutoCloseableIterator.class); + final Consumer statusEmitter = mock(Consumer.class); + final Optional> streamStatusEmitter = Optional.of(statusEmitter); + + when(stream.getAirbyteStream()).thenReturn(Optional.of(airbyteStream)); + + StreamStatusUtils.emitCompleteStreamStatus(stream, streamStatusEmitter); + + verify(statusEmitter, times(1)).accept(airbyteStreamStatusHolderArgumentCaptor.capture()); + assertEquals(AirbyteStreamStatus.COMPLETE, airbyteStreamStatusHolderArgumentCaptor.getValue().toTraceMessage().getStreamStatus().getStatus()); + } + + @Test + void testEmitCompleteStreamStatusIteratorEmptyAirbyteStream() { + final AutoCloseableIterator stream = mock(AutoCloseableIterator.class); + final Consumer statusEmitter = mock(Consumer.class); + final Optional> streamStatusEmitter = Optional.of(statusEmitter); + + when(stream.getAirbyteStream()).thenReturn(Optional.empty()); + + assertDoesNotThrow(() -> StreamStatusUtils.emitCompleteStreamStatus(stream, streamStatusEmitter)); + verify(statusEmitter, times(0)).accept(airbyteStreamStatusHolderArgumentCaptor.capture()); + } + + @Test + void testEmitCompleteStreamStatusIteratorEmptyStatusEmitter() { + final AirbyteStreamNameNamespacePair airbyteStream = new AirbyteStreamNameNamespacePair(NAME, NAMESPACE); + final AutoCloseableIterator stream = mock(AutoCloseableIterator.class); + final Optional> streamStatusEmitter = Optional.empty(); + + when(stream.getAirbyteStream()).thenReturn(Optional.of(airbyteStream)); + + assertDoesNotThrow(() -> StreamStatusUtils.emitCompleteStreamStatus(stream, streamStatusEmitter)); + } + + @Test + void testEmitCompleteStreamStatusAirbyteStreamAware() { + final AirbyteStreamNameNamespacePair airbyteStream = new AirbyteStreamNameNamespacePair(NAME, NAMESPACE); + final AirbyteStreamAware stream = mock(AirbyteStreamAware.class); + final Consumer statusEmitter = mock(Consumer.class); + final Optional> streamStatusEmitter = Optional.of(statusEmitter); + + when(stream.getAirbyteStream()).thenReturn(Optional.of(airbyteStream)); + + StreamStatusUtils.emitCompleteStreamStatus(stream, streamStatusEmitter); + + verify(statusEmitter, times(1)).accept(airbyteStreamStatusHolderArgumentCaptor.capture()); + assertEquals(AirbyteStreamStatus.COMPLETE, airbyteStreamStatusHolderArgumentCaptor.getValue().toTraceMessage().getStreamStatus().getStatus()); + } + + @Test + void testEmitCompleteStreamStatusAirbyteStreamAwareEmptyStream() { + final AirbyteStreamAware stream = mock(AirbyteStreamAware.class); + final Consumer statusEmitter = mock(Consumer.class); + final Optional> streamStatusEmitter = Optional.of(statusEmitter); + + when(stream.getAirbyteStream()).thenReturn(Optional.empty()); + + assertDoesNotThrow(() -> StreamStatusUtils.emitCompleteStreamStatus(stream, streamStatusEmitter)); + verify(statusEmitter, times(0)).accept(airbyteStreamStatusHolderArgumentCaptor.capture()); + } + + @Test + void testEmitCompleteStreamStatusAirbyteStreamAwareEmptyStatusEmitter() { + final AirbyteStreamNameNamespacePair airbyteStream = new AirbyteStreamNameNamespacePair(NAME, NAMESPACE); + final AirbyteStreamAware stream = mock(AirbyteStreamAware.class); + final Optional> streamStatusEmitter = Optional.empty(); + + when(stream.getAirbyteStream()).thenReturn(Optional.of(airbyteStream)); + + assertDoesNotThrow(() -> StreamStatusUtils.emitCompleteStreamStatus(stream, streamStatusEmitter)); + } + + @Test + void testEmitCompleteStreamStatusAirbyteStream() { + final AirbyteStreamNameNamespacePair airbyteStream = new AirbyteStreamNameNamespacePair(NAME, NAMESPACE); + final Consumer statusEmitter = mock(Consumer.class); + final Optional> streamStatusEmitter = Optional.of(statusEmitter); + + StreamStatusUtils.emitCompleteStreamStatus(Optional.of(airbyteStream), streamStatusEmitter); + + verify(statusEmitter, times(1)).accept(airbyteStreamStatusHolderArgumentCaptor.capture()); + assertEquals(AirbyteStreamStatus.COMPLETE, airbyteStreamStatusHolderArgumentCaptor.getValue().toTraceMessage().getStreamStatus().getStatus()); + } + + @Test + void testEmitCompleteStreamStatusEmptyAirbyteStream() { + final Consumer statusEmitter = mock(Consumer.class); + final Optional> streamStatusEmitter = Optional.of(statusEmitter); + + assertDoesNotThrow(() -> StreamStatusUtils.emitCompleteStreamStatus(Optional.empty(), streamStatusEmitter)); + verify(statusEmitter, times(0)).accept(airbyteStreamStatusHolderArgumentCaptor.capture()); + } + + @Test + void testEmitCompleteStreamStatusAirbyteStreamEmptyStatusEmitter() { + final AirbyteStreamNameNamespacePair airbyteStream = new AirbyteStreamNameNamespacePair(NAME, NAMESPACE); + final Optional> streamStatusEmitter = Optional.empty(); + + assertDoesNotThrow(() -> StreamStatusUtils.emitCompleteStreamStatus(Optional.of(airbyteStream), streamStatusEmitter)); + } + + @Test + void testEmitIncompleteStreamStatusIterator() { + final AirbyteStreamNameNamespacePair airbyteStream = new AirbyteStreamNameNamespacePair(NAME, NAMESPACE); + final AutoCloseableIterator stream = mock(AutoCloseableIterator.class); + final Consumer statusEmitter = mock(Consumer.class); + final Optional> streamStatusEmitter = Optional.of(statusEmitter); + + when(stream.getAirbyteStream()).thenReturn(Optional.of(airbyteStream)); + + StreamStatusUtils.emitIncompleteStreamStatus(stream, streamStatusEmitter); + + verify(statusEmitter, times(1)).accept(airbyteStreamStatusHolderArgumentCaptor.capture()); + assertEquals(AirbyteStreamStatus.INCOMPLETE, airbyteStreamStatusHolderArgumentCaptor.getValue().toTraceMessage().getStreamStatus().getStatus()); + } + + @Test + void testEmitIncompleteStreamStatusIteratorEmptyAirbyteStream() { + final AutoCloseableIterator stream = mock(AutoCloseableIterator.class); + final Consumer statusEmitter = mock(Consumer.class); + final Optional> streamStatusEmitter = Optional.of(statusEmitter); + + when(stream.getAirbyteStream()).thenReturn(Optional.empty()); + + assertDoesNotThrow(() -> StreamStatusUtils.emitIncompleteStreamStatus(stream, streamStatusEmitter)); + verify(statusEmitter, times(0)).accept(airbyteStreamStatusHolderArgumentCaptor.capture()); + } + + @Test + void testEmitIncompleteStreamStatusIteratorEmptyStatusEmitter() { + final AirbyteStreamNameNamespacePair airbyteStream = new AirbyteStreamNameNamespacePair(NAME, NAMESPACE); + final AutoCloseableIterator stream = mock(AutoCloseableIterator.class); + final Optional> streamStatusEmitter = Optional.empty(); + + when(stream.getAirbyteStream()).thenReturn(Optional.of(airbyteStream)); + + assertDoesNotThrow(() -> StreamStatusUtils.emitIncompleteStreamStatus(stream, streamStatusEmitter)); + } + + @Test + void testEmitIncompleteStreamStatusAirbyteStreamAware() { + final AirbyteStreamNameNamespacePair airbyteStream = new AirbyteStreamNameNamespacePair(NAME, NAMESPACE); + final AirbyteStreamAware stream = mock(AirbyteStreamAware.class); + final Consumer statusEmitter = mock(Consumer.class); + final Optional> streamStatusEmitter = Optional.of(statusEmitter); + + when(stream.getAirbyteStream()).thenReturn(Optional.of(airbyteStream)); + + StreamStatusUtils.emitIncompleteStreamStatus(stream, streamStatusEmitter); + + verify(statusEmitter, times(1)).accept(airbyteStreamStatusHolderArgumentCaptor.capture()); + assertEquals(AirbyteStreamStatus.INCOMPLETE, airbyteStreamStatusHolderArgumentCaptor.getValue().toTraceMessage().getStreamStatus().getStatus()); + } + + @Test + void testEmitIncompleteStreamStatusAirbyteStreamAwareEmptyStream() { + final AirbyteStreamAware stream = mock(AirbyteStreamAware.class); + final Consumer statusEmitter = mock(Consumer.class); + final Optional> streamStatusEmitter = Optional.of(statusEmitter); + + when(stream.getAirbyteStream()).thenReturn(Optional.empty()); + + assertDoesNotThrow(() -> StreamStatusUtils.emitIncompleteStreamStatus(stream, streamStatusEmitter)); + verify(statusEmitter, times(0)).accept(airbyteStreamStatusHolderArgumentCaptor.capture()); + } + + @Test + void testEmitIncompleteStreamStatusAirbyteStreamAwareEmptyStatusEmitter() { + final AirbyteStreamNameNamespacePair airbyteStream = new AirbyteStreamNameNamespacePair(NAME, NAMESPACE); + final AirbyteStreamAware stream = mock(AirbyteStreamAware.class); + final Optional> streamStatusEmitter = Optional.empty(); + + when(stream.getAirbyteStream()).thenReturn(Optional.of(airbyteStream)); + + assertDoesNotThrow(() -> StreamStatusUtils.emitIncompleteStreamStatus(stream, streamStatusEmitter)); + } + + @Test + void testEmitIncompleteStreamStatusAirbyteStream() { + final AirbyteStreamNameNamespacePair airbyteStream = new AirbyteStreamNameNamespacePair(NAME, NAMESPACE); + final Consumer statusEmitter = mock(Consumer.class); + final Optional> streamStatusEmitter = Optional.of(statusEmitter); + + StreamStatusUtils.emitIncompleteStreamStatus(Optional.of(airbyteStream), streamStatusEmitter); + + verify(statusEmitter, times(1)).accept(airbyteStreamStatusHolderArgumentCaptor.capture()); + assertEquals(AirbyteStreamStatus.INCOMPLETE, airbyteStreamStatusHolderArgumentCaptor.getValue().toTraceMessage().getStreamStatus().getStatus()); + } + + @Test + void testEmitIncompleteStreamStatusEmptyAirbyteStream() { + final Consumer statusEmitter = mock(Consumer.class); + final Optional> streamStatusEmitter = Optional.of(statusEmitter); + + assertDoesNotThrow(() -> StreamStatusUtils.emitIncompleteStreamStatus(Optional.empty(), streamStatusEmitter)); + verify(statusEmitter, times(0)).accept(airbyteStreamStatusHolderArgumentCaptor.capture()); + } + + @Test + void testEmitIncompleteStreamStatusAirbyteStreamEmptyStatusEmitter() { + final AirbyteStreamNameNamespacePair airbyteStream = new AirbyteStreamNameNamespacePair(NAME, NAMESPACE); + final Optional> streamStatusEmitter = Optional.empty(); + + assertDoesNotThrow(() -> StreamStatusUtils.emitIncompleteStreamStatus(Optional.of(airbyteStream), streamStatusEmitter)); + } + +} diff --git a/airbyte-commons/src/test/java/io/airbyte/commons/util/AutoCloseableIteratorsTest.java b/airbyte-commons/src/test/java/io/airbyte/commons/util/AutoCloseableIteratorsTest.java index bc4661282d41..3fbb86b2d950 100644 --- a/airbyte-commons/src/test/java/io/airbyte/commons/util/AutoCloseableIteratorsTest.java +++ b/airbyte-commons/src/test/java/io/airbyte/commons/util/AutoCloseableIteratorsTest.java @@ -12,7 +12,6 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import io.airbyte.commons.concurrency.VoidCallable; import java.util.Iterator; @@ -73,7 +72,7 @@ void testAppendOnClose() throws Exception { @Test void testTransform() { final Iterator transform = Iterators.transform(MoreIterators.of(1, 2, 3), i -> i + 1); - assertEquals(ImmutableList.of(2, 3, 4), MoreIterators.toList(transform)); + assertEquals(List.of(2, 3, 4), MoreIterators.toList(transform)); } @Test @@ -81,17 +80,17 @@ void testConcatWithEagerClose() throws Exception { final VoidCallable onClose1 = mock(VoidCallable.class); final VoidCallable onClose2 = mock(VoidCallable.class); - final AutoCloseableIterator iterator = new CompositeIterator<>(ImmutableList.of( + final AutoCloseableIterator iterator = new CompositeIterator<>(List.of( AutoCloseableIterators.fromIterator(MoreIterators.of("a", "b"), onClose1, null), AutoCloseableIterators.fromIterator(MoreIterators.of("d"), onClose2, null)), null); - assertOnCloseInvocations(ImmutableList.of(), ImmutableList.of(onClose1, onClose2)); + assertOnCloseInvocations(List.of(), List.of(onClose1, onClose2)); assertNext(iterator, "a"); assertNext(iterator, "b"); assertNext(iterator, "d"); - assertOnCloseInvocations(ImmutableList.of(onClose1), ImmutableList.of(onClose2)); + assertOnCloseInvocations(List.of(onClose1), List.of(onClose2)); assertFalse(iterator.hasNext()); - assertOnCloseInvocations(ImmutableList.of(onClose1, onClose2), ImmutableList.of()); + assertOnCloseInvocations(List.of(onClose1, onClose2), List.of()); iterator.close(); diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java index 80e10a0a25b1..f35a78407842 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java @@ -7,14 +7,18 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import datadog.trace.api.Trace; +import io.airbyte.commons.features.EnvVariableFeatureFlags; +import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.lang.Exceptions.Procedure; +import io.airbyte.commons.stream.StreamStatusUtils; import io.airbyte.commons.string.Strings; import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.integrations.util.ApmTraceUtils; import io.airbyte.integrations.util.ConnectorExceptionUtil; +import io.airbyte.integrations.util.concurrent.ConcurrentStreamConsumer; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteMessage.Type; @@ -24,6 +28,7 @@ import java.io.ByteArrayOutputStream; import java.nio.charset.StandardCharsets; import java.nio.file.Path; +import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.Set; @@ -31,6 +36,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.function.Predicate; +import java.util.stream.Collectors; import org.apache.commons.lang3.ThreadUtils; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.slf4j.Logger; @@ -45,16 +52,32 @@ public class IntegrationRunner { private static final Logger LOGGER = LoggerFactory.getLogger(IntegrationRunner.class); + /** + * Filters threads that should not be considered when looking for orphaned threads at shutdown of + * the integration runner. + *

+ *

+ * N.B. Daemon threads don't block the JVM if the main `currentThread` exits, so they are not + * problematic. Additionally, ignore database connection pool threads, which stay active so long as + * the database connection pool is open. + */ + @VisibleForTesting + static final Predicate ORPHANED_THREAD_FILTER = runningThread -> !runningThread.getName().equals(Thread.currentThread().getName()) + && !runningThread.isDaemon(); + public static final int INTERRUPT_THREAD_DELAY_MINUTES = 60; public static final int EXIT_THREAD_DELAY_MINUTES = 70; public static final int FORCED_EXIT_CODE = 2; + private static final Runnable EXIT_HOOK = () -> System.exit(FORCED_EXIT_CODE); + private final IntegrationCliParser cliParser; private final Consumer outputRecordCollector; private final Integration integration; private final Destination destination; private final Source source; + private final FeatureFlags featureFlags; private static JsonSchemaValidator validator; public IntegrationRunner(final Destination destination) { @@ -77,6 +100,7 @@ public IntegrationRunner(final Source source) { integration = source != null ? source : destination; this.source = source; this.destination = destination; + this.featureFlags = new EnvVariableFeatureFlags(); validator = new JsonSchemaValidator(); Thread.setDefaultUncaughtExceptionHandler(new AirbyteExceptionHandler()); @@ -136,8 +160,17 @@ private void runInternal(final IntegrationConfig parsed) throws Exception { validateConfig(integration.spec().getConnectionSpecification(), config, "READ"); final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class); final Optional stateOptional = parsed.getStatePath().map(IntegrationRunner::parseConfig); - try (final AutoCloseableIterator messageIterator = source.read(config, catalog, stateOptional.orElse(null))) { - produceMessages(messageIterator); + try { + if (featureFlags.concurrentSourceStreamRead()) { + LOGGER.info("Concurrent source stream read enabled."); + readConcurrent(config, catalog, stateOptional); + } else { + readSerial(config, catalog, stateOptional); + } + } finally { + if (source instanceof AutoCloseable) { + ((AutoCloseable) source).close(); + } } } // destination only @@ -148,19 +181,15 @@ private void runInternal(final IntegrationConfig parsed) throws Exception { DestinationConfig.initialize(config); final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class); - final Procedure consumeWriteStreamCallable = () -> { - try (final SerializedAirbyteMessageConsumer consumer = destination.getSerializedMessageConsumer(config, catalog, outputRecordCollector)) { - consumeWriteStream(consumer); - } - }; - - watchForOrphanThreads( - consumeWriteStreamCallable, - () -> System.exit(FORCED_EXIT_CODE), - INTERRUPT_THREAD_DELAY_MINUTES, - TimeUnit.MINUTES, - EXIT_THREAD_DELAY_MINUTES, - TimeUnit.MINUTES); + try (final SerializedAirbyteMessageConsumer consumer = destination.getSerializedMessageConsumer(config, catalog, outputRecordCollector)) { + consumeWriteStream(consumer); + } finally { + stopOrphanedThreads(EXIT_HOOK, + INTERRUPT_THREAD_DELAY_MINUTES, + TimeUnit.MINUTES, + EXIT_THREAD_DELAY_MINUTES, + TimeUnit.MINUTES); + } } default -> throw new IllegalStateException("Unexpected value: " + parsed.getCommand()); } @@ -197,14 +226,66 @@ private void runInternal(final IntegrationConfig parsed) throws Exception { LOGGER.info("Completed integration: {}", integration.getClass().getName()); } - private void produceMessages(final AutoCloseableIterator messageIterator) throws Exception { - watchForOrphanThreads( - () -> messageIterator.forEachRemaining(outputRecordCollector), - () -> System.exit(FORCED_EXIT_CODE), - INTERRUPT_THREAD_DELAY_MINUTES, - TimeUnit.MINUTES, - EXIT_THREAD_DELAY_MINUTES, - TimeUnit.MINUTES); + private void produceMessages(final AutoCloseableIterator messageIterator, final Consumer recordCollector) { + messageIterator.getAirbyteStream().ifPresent(s -> LOGGER.debug("Producing messages for stream {}...", s)); + messageIterator.forEachRemaining(recordCollector); + messageIterator.getAirbyteStream().ifPresent(s -> LOGGER.debug("Finished producing messages for stream {}...")); + } + + private void readConcurrent(final JsonNode config, ConfiguredAirbyteCatalog catalog, final Optional stateOptional) throws Exception { + final Collection> streams = source.readStreams(config, catalog, stateOptional.orElse(null)); + + try (final ConcurrentStreamConsumer streamConsumer = new ConcurrentStreamConsumer(this::consumeFromStream, streams.size())) { + /* + * Break the streams into partitions equal to the number of concurrent streams supported by the + * stream consumer. + */ + final Integer partitionSize = streamConsumer.getParallelism(); + final List>> partitions = Lists.partition(streams.stream().toList(), + partitionSize); + + // Submit each stream partition for concurrent execution + partitions.forEach(partition -> { + streamConsumer.accept(partition); + }); + + // Check for any exceptions that were raised during the concurrent execution + if (streamConsumer.getException().isPresent()) { + throw streamConsumer.getException().get(); + } + } catch (final Exception e) { + LOGGER.error("Unable to perform concurrent read.", e); + throw e; + } finally { + stopOrphanedThreads(EXIT_HOOK, + INTERRUPT_THREAD_DELAY_MINUTES, + TimeUnit.MINUTES, + EXIT_THREAD_DELAY_MINUTES, + TimeUnit.MINUTES); + } + } + + private void readSerial(final JsonNode config, ConfiguredAirbyteCatalog catalog, final Optional stateOptional) throws Exception { + try (final AutoCloseableIterator messageIterator = source.read(config, catalog, stateOptional.orElse(null))) { + produceMessages(messageIterator, outputRecordCollector); + } finally { + stopOrphanedThreads(EXIT_HOOK, + INTERRUPT_THREAD_DELAY_MINUTES, + TimeUnit.MINUTES, + EXIT_THREAD_DELAY_MINUTES, + TimeUnit.MINUTES); + } + } + + private void consumeFromStream(final AutoCloseableIterator stream) { + try { + final Consumer streamStatusTrackingRecordConsumer = StreamStatusUtils.statusTrackingRecordCollector(stream, + outputRecordCollector, Optional.of(AirbyteTraceMessageUtility::emitStreamStatusTrace)); + produceMessages(stream, streamStatusTrackingRecordConsumer); + } catch (final Exception e) { + stream.getAirbyteStream().ifPresent(s -> LOGGER.error("Failed to consume from stream {}.", s, e)); + throw new RuntimeException(e); + } } @VisibleForTesting @@ -249,60 +330,58 @@ static void consumeWriteStream(final SerializedAirbyteMessageConsumer consumer, } /** - * This method calls a runMethod and make sure that it won't produce orphan non-daemon active - * threads once it is done. Active non-daemon threads blocks JVM from exiting when the main thread - * is done, whereas daemon ones don't. + * Stops any non-daemon threads that could block the JVM from exiting when the main thread is done. *

* If any active non-daemon threads would be left as orphans, this method will schedule some * interrupt/exit hooks after giving it some time delay to close up properly. It is generally * preferred to have a proper closing sequence from children threads instead of interrupting or * force exiting the process, so this mechanism serve as a fallback while surfacing warnings in logs * for maintainers to fix the code behavior instead. + * + * @param exitHook The {@link Runnable} exit hook to execute for any orphaned threads. + * @param interruptTimeDelay The time to delay execution of the orphaned thread interrupt attempt. + * @param interruptTimeUnit The time unit of the interrupt delay. + * @param exitTimeDelay The time to delay execution of the orphaned thread exit hook. + * @param exitTimeUnit The time unit of the exit delay. */ @VisibleForTesting - static void watchForOrphanThreads(final Procedure runMethod, - final Runnable exitHook, - final int interruptTimeDelay, - final TimeUnit interruptTimeUnit, - final int exitTimeDelay, - final TimeUnit exitTimeUnit) - throws Exception { + static void stopOrphanedThreads(final Runnable exitHook, + final int interruptTimeDelay, + final TimeUnit interruptTimeUnit, + final int exitTimeDelay, + final TimeUnit exitTimeUnit) { final Thread currentThread = Thread.currentThread(); - try { - runMethod.call(); - } finally { - final List runningThreads = ThreadUtils.getAllThreads() - .stream() - // daemon threads don't block the JVM if the main `currentThread` exits, so they are not problematic - .filter(runningThread -> !runningThread.getName().equals(currentThread.getName()) && !runningThread.isDaemon()) - .toList(); - if (!runningThreads.isEmpty()) { - LOGGER.warn(""" - The main thread is exiting while children non-daemon threads from a connector are still active. - Ideally, this situation should not happen... - Please check with maintainers if the connector or library code should safely clean up its threads before quitting instead. - The main thread is: {}""", dumpThread(currentThread)); - final ScheduledExecutorService scheduledExecutorService = Executors - .newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder() - // this thread executor will create daemon threads, so it does not block exiting if all other active - // threads are already stopped. - .daemon(true).build()); - for (final Thread runningThread : runningThreads) { - final String str = "Active non-daemon thread: " + dumpThread(runningThread); - LOGGER.warn(str); - // even though the main thread is already shutting down, we still leave some chances to the children - // threads to close properly on their own. - // So, we schedule an interrupt hook after a fixed time delay instead... - scheduledExecutorService.schedule(runningThread::interrupt, interruptTimeDelay, interruptTimeUnit); - } - scheduledExecutorService.schedule(() -> { - if (ThreadUtils.getAllThreads().stream() - .anyMatch(runningThread -> !runningThread.isDaemon() && !runningThread.getName().equals(currentThread.getName()))) { - LOGGER.error("Failed to interrupt children non-daemon threads, forcefully exiting NOW...\n"); - exitHook.run(); - } - }, exitTimeDelay, exitTimeUnit); + + final List runningThreads = ThreadUtils.getAllThreads() + .stream() + .filter(ORPHANED_THREAD_FILTER) + .collect(Collectors.toList()); + if (!runningThreads.isEmpty()) { + LOGGER.warn(""" + The main thread is exiting while children non-daemon threads from a connector are still active. + Ideally, this situation should not happen... + Please check with maintainers if the connector or library code should safely clean up its threads before quitting instead. + The main thread is: {}""", dumpThread(currentThread)); + final ScheduledExecutorService scheduledExecutorService = Executors + .newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder() + // this thread executor will create daemon threads, so it does not block exiting if all other active + // threads are already stopped. + .daemon(true).build()); + for (final Thread runningThread : runningThreads) { + final String str = "Active non-daemon thread: " + dumpThread(runningThread); + LOGGER.warn(str); + // even though the main thread is already shutting down, we still leave some chances to the children + // threads to close properly on their own. + // So, we schedule an interrupt hook after a fixed time delay instead... + scheduledExecutorService.schedule(runningThread::interrupt, interruptTimeDelay, interruptTimeUnit); } + scheduledExecutorService.schedule(() -> { + if (ThreadUtils.getAllThreads().stream() + .anyMatch(runningThread -> !runningThread.isDaemon() && !runningThread.getName().equals(currentThread.getName()))) { + LOGGER.error("Failed to interrupt children non-daemon threads, forcefully exiting NOW...\n"); + exitHook.run(); + } + }, exitTimeDelay, exitTimeUnit); } } diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/Source.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/Source.java index f391ed2d2347..424bd780e5b3 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/Source.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/Source.java @@ -9,6 +9,8 @@ import io.airbyte.protocol.models.v0.AirbyteCatalog; import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; +import java.util.Collection; +import java.util.List; public interface Source extends Integration { @@ -36,4 +38,21 @@ public interface Source extends Integration { */ AutoCloseableIterator read(JsonNode config, ConfiguredAirbyteCatalog catalog, JsonNode state) throws Exception; + /** + * Returns a collection of iterators of messages pulled from the source, each representing a + * "stream". + * + * @param config - integration-specific configuration object as json. e.g. { "username": "airbyte", + * "password": "super secure" } + * @param catalog - schema of the incoming messages. + * @param state - state of the incoming messages. + * @return The collection of {@link AutoCloseableIterator} instances that produce messages for each + * configured "stream" + * @throws Exception - any exception + */ + default Collection> readStreams(JsonNode config, ConfiguredAirbyteCatalog catalog, JsonNode state) + throws Exception { + return List.of(read(config, catalog, state)); + } + } diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/spec_modification/SpecModifyingSource.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/spec_modification/SpecModifyingSource.java index f7cfef4df5af..a0e26a5bcc9c 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/spec_modification/SpecModifyingSource.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/spec_modification/SpecModifyingSource.java @@ -12,6 +12,7 @@ import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.v0.ConnectorSpecification; +import java.util.Collection; /** * In some cases we want to prune or mutate the spec for an existing source. The common case is that @@ -49,4 +50,10 @@ public AutoCloseableIterator read(final JsonNode config, final C return source.read(config, catalog, state); } + @Override + public Collection> readStreams(JsonNode config, ConfiguredAirbyteCatalog catalog, JsonNode state) + throws Exception { + return source.readStreams(config, catalog, state); + } + } diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshWrappedSource.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshWrappedSource.java index bb3b7de21fe2..08971e9ec768 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshWrappedSource.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshWrappedSource.java @@ -15,6 +15,7 @@ import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.v0.ConnectorSpecification; +import java.util.Collection; import java.util.List; import java.util.Optional; import org.slf4j.Logger; @@ -80,4 +81,17 @@ public AutoCloseableIterator read(final JsonNode config, final C return AutoCloseableIterators.appendOnClose(delegateRead, tunnel::close); } + @Override + public Collection> readStreams(JsonNode config, ConfiguredAirbyteCatalog catalog, JsonNode state) + throws Exception { + final SshTunnel tunnel = SshTunnel.getInstance(config, hostKey, portKey); + try { + return delegate.readStreams(tunnel.getConfigInTunnel(), catalog, state); + } catch (final Exception e) { + LOGGER.error("Exception occurred while getting the delegate read stream iterators, closing SSH tunnel", e); + tunnel.close(); + throw e; + } + } + } diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/util/concurrent/ConcurrentStreamConsumer.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/util/concurrent/ConcurrentStreamConsumer.java new file mode 100644 index 000000000000..7d9bdb15ead7 --- /dev/null +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/util/concurrent/ConcurrentStreamConsumer.java @@ -0,0 +1,237 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.util.concurrent; + +import io.airbyte.commons.stream.AirbyteStreamStatusHolder; +import io.airbyte.commons.stream.StreamStatusUtils; +import io.airbyte.commons.util.AutoCloseableIterator; +import io.airbyte.integrations.base.AirbyteTraceMessageUtility; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.v0.AirbyteMessage; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor.AbortPolicy; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link Consumer} implementation that consumes {@link AirbyteMessage} records from each provided + * stream concurrently. + *

+ *

+ * The consumer calculates the parallelism based on the provided requested parallelism. If the + * requested parallelism is greater than zero, the minimum value between the requested parallelism + * and the maximum number of allowed threads is chosen as the parallelism value. Otherwise, the + * minimum parallelism value is selected. This is to avoid issues with attempting to execute with a + * parallelism value of zero, which is not allowed by the underlying {@link ExecutorService}. + *

+ *

+ * This consumer will capture any raised exceptions during execution of each stream. Anu exceptions + * are stored and made available by calling the {@link #getException()} method. + */ +public class ConcurrentStreamConsumer implements Consumer>>, AutoCloseable { + + private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentStreamConsumer.class); + + /** + * Name of threads spawned by the {@link ConcurrentStreamConsumer}. + */ + public static final String CONCURRENT_STREAM_THREAD_NAME = "concurrent-stream-thread"; + + private final ExecutorService executorService; + private final List exceptions; + private final Integer parallelism; + private final Consumer> streamConsumer; + private final Optional> streamStatusEmitter = + Optional.of(AirbyteTraceMessageUtility::emitStreamStatusTrace); + + /** + * Constructs a new {@link ConcurrentStreamConsumer} that will use the provided stream consumer to + * execute each stream submitted to the {@link #accept(Collection)} method of + * this consumer. Streams submitted to the {@link #accept(Collection)} method + * will be converted to a {@link Runnable} and executed on an {@link ExecutorService} configured by + * this consumer to ensure concurrent execution of each stream. + * + * @param streamConsumer The {@link Consumer} that accepts streams as an + * {@link AutoCloseableIterator}. + * @param requestedParallelism The requested amount of parallelism that will be used as a hint to + * determine the appropriate number of threads to execute concurrently. + */ + public ConcurrentStreamConsumer(final Consumer> streamConsumer, final Integer requestedParallelism) { + this.parallelism = computeParallelism(requestedParallelism); + this.executorService = createExecutorService(parallelism); + this.exceptions = new ArrayList<>(); + this.streamConsumer = streamConsumer; + } + + @Override + public void accept(final Collection> streams) { + /* + * Submit the provided streams to the underlying executor service for concurrent execution. This + * thread will track the status of each stream as well as consuming all messages produced from each + * stream, passing them to the provided message consumer for further processing. Any exceptions + * raised within the thread will be captured and exposed to the caller. + */ + final Collection> futures = streams.stream() + .map(stream -> new ConcurrentStreamRunnable(stream, this)) + .map(runnable -> CompletableFuture.runAsync(runnable, executorService)) + .collect(Collectors.toList()); + + /* + * Wait for the submitted streams to complete before returning. This uses the join() method to allow + * all streams to complete even if one or more encounters an exception. + */ + LOGGER.debug("Waiting for all streams to complete...."); + CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).join(); + LOGGER.debug("Completed consuming from all streams."); + } + + /** + * Returns the first captured {@link Exception}. + * + * @return The first captured {@link Exception} or an empty {@link Optional} if no exceptions were + * captured during execution. + */ + public Optional getException() { + if (!exceptions.isEmpty()) { + return Optional.of(exceptions.get(0)); + } else { + return Optional.empty(); + } + } + + /** + * Returns the list of exceptions captured during execution of the streams, if any. + * + * @return The collection of captured exceptions or an empty list. + */ + public List getExceptions() { + return Collections.unmodifiableList(exceptions); + } + + /** + * Returns the parallelism value that will be used by this consumer to execute the consumption of + * data from the provided streams in parallel. + * + * @return The parallelism value of this consumer. + */ + public Integer getParallelism() { + return computeParallelism(parallelism); + } + + /** + * Calculates the parallelism based on the requested parallelism. If the requested parallelism is + * greater than zero, the minimum value between the parallelism and the maximum parallelism is + * chosen as the parallelism count. Otherwise, the minimum parallelism is selected. This is to avoid + * issues with attempting to create an executor service with a thread pool size of 0, which is not + * allowed. + * + * @param requestedParallelism The requested parallelism. + * @return The selected parallelism based on the factors outlined above. + */ + private Integer computeParallelism(final Integer requestedParallelism) { + /* + * Selects the default thread pool size based on the provided value via an environment variable or + * the number of available processors if the environment variable is not set/present. This is to + * ensure that we do not over-parallelize unless requested explicitly. + */ + final Integer defaultPoolSize = Optional.ofNullable(System.getenv("DEFAULT_CONCURRENT_STREAM_CONSUMER_THREADS")) + .map(Integer::parseInt) + .orElseGet(() -> Runtime.getRuntime().availableProcessors()); + LOGGER.debug("Default parallelism: {}, Requested parallelism: {}", defaultPoolSize, requestedParallelism); + final Integer parallelism = Math.min(defaultPoolSize, requestedParallelism > 0 ? requestedParallelism : 1); + LOGGER.debug("Computed concurrent stream consumer parallelism: {}", parallelism); + return parallelism; + } + + /** + * Creates the {@link ExecutorService} that will be used by the consumer to consume from the + * provided streams in parallel. + * + * @param nThreads The number of threads to execute concurrently. + * @return The configured {@link ExecutorService}. + */ + private ExecutorService createExecutorService(final Integer nThreads) { + return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), + new ConcurrentStreamThreadFactory(), new AbortPolicy()); + } + + /** + * Executes the stream by providing it to the configured {@link #streamConsumer}. + * + * @param stream The stream to be executed. + */ + private void executeStream(final AutoCloseableIterator stream) { + try (stream) { + stream.getAirbyteStream().ifPresent(s -> LOGGER.debug("Consuming from stream {}...", s)); + StreamStatusUtils.emitStartStreamStatus(stream, streamStatusEmitter); + streamConsumer.accept(stream); + StreamStatusUtils.emitCompleteStreamStatus(stream, streamStatusEmitter); + stream.getAirbyteStream().ifPresent(s -> LOGGER.debug("Consumption from stream {} complete.", s)); + } catch (final Exception e) { + stream.getAirbyteStream().ifPresent(s -> LOGGER.error("Unable to consume from stream {}.", s, e)); + StreamStatusUtils.emitIncompleteStreamStatus(stream, streamStatusEmitter); + exceptions.add(e); + } + } + + @Override + public void close() throws Exception { + // Block waiting for the executor service to close + executorService.shutdownNow(); + executorService.awaitTermination(30, TimeUnit.SECONDS); + } + + /** + * Custom {@link ThreadFactory} that names the threads used to concurrently execute streams. + */ + private static class ConcurrentStreamThreadFactory implements ThreadFactory { + + @Override + public Thread newThread(final Runnable r) { + final Thread thread = new Thread(r); + if (r instanceof ConcurrentStreamRunnable) { + final AutoCloseableIterator stream = ((ConcurrentStreamRunnable) r).stream(); + if (stream.getAirbyteStream().isPresent()) { + final AirbyteStreamNameNamespacePair airbyteStream = stream.getAirbyteStream().get(); + thread.setName(String.format("%s-%s-%s", CONCURRENT_STREAM_THREAD_NAME, airbyteStream.getNamespace(), airbyteStream.getName())); + } else { + thread.setName(CONCURRENT_STREAM_THREAD_NAME); + } + } else { + thread.setName(CONCURRENT_STREAM_THREAD_NAME); + } + return thread; + } + + } + + /** + * Custom {@link Runnable} that exposes the stream for thread naming purposes. + * + * @param stream The stream that is part of the {@link Runnable} execution. + * @param consumer The {@link ConcurrentStreamConsumer} that will execute the stream. + */ + private record ConcurrentStreamRunnable(AutoCloseableIterator stream, ConcurrentStreamConsumer consumer) implements Runnable { + + @Override + public void run() { + consumer.executeStream(stream); + } + + } + +} diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java index fbfbae6d13f6..8f2aaf57615c 100644 --- a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java +++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java @@ -4,6 +4,7 @@ package io.airbyte.integrations.base; +import static io.airbyte.integrations.base.IntegrationRunner.ORPHANED_THREAD_FILTER; import static io.airbyte.integrations.util.ConnectorExceptionUtil.COMMON_EXCEPTION_MESSAGE_TEMPLATE; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable; @@ -371,24 +372,20 @@ void testDestinationConsumerLifecycleFailure() throws Exception { } @Test - void testInterruptOrphanThreadFailure() { - final String testName = Thread.currentThread().getName(); + void testInterruptOrphanThread() { final List caughtExceptions = new ArrayList<>(); startSleepingThread(caughtExceptions, false); - assertThrows(IOException.class, () -> IntegrationRunner.watchForOrphanThreads( - () -> { - throw new IOException("random error"); - }, + IntegrationRunner.stopOrphanedThreads( Assertions::fail, 3, TimeUnit.SECONDS, - 10, TimeUnit.SECONDS)); + 10, TimeUnit.SECONDS); try { TimeUnit.SECONDS.sleep(15); } catch (final Exception e) { throw new RuntimeException(e); } final List runningThreads = ThreadUtils.getAllThreads().stream() - .filter(runningThread -> !runningThread.isDaemon() && !runningThread.getName().equals(testName)) + .filter(ORPHANED_THREAD_FILTER) .collect(Collectors.toList()); // all threads should be interrupted assertEquals(List.of(), runningThreads); @@ -396,26 +393,23 @@ void testInterruptOrphanThreadFailure() { } @Test - void testNoInterruptOrphanThreadFailure() { - final String testName = Thread.currentThread().getName(); + void testNoInterruptOrphanThread() { final List caughtExceptions = new ArrayList<>(); final AtomicBoolean exitCalled = new AtomicBoolean(false); startSleepingThread(caughtExceptions, true); - assertThrows(IOException.class, () -> IntegrationRunner.watchForOrphanThreads( - () -> { - throw new IOException("random error"); - }, + IntegrationRunner.stopOrphanedThreads( () -> exitCalled.set(true), 3, TimeUnit.SECONDS, - 10, TimeUnit.SECONDS)); + 10, TimeUnit.SECONDS); try { TimeUnit.SECONDS.sleep(15); } catch (final Exception e) { throw new RuntimeException(e); } + final List runningThreads = ThreadUtils.getAllThreads().stream() - .filter(runningThread -> !runningThread.isDaemon() && !runningThread.getName().equals(testName)) - .toList(); + .filter(ORPHANED_THREAD_FILTER) + .collect(Collectors.toList()); // a thread that refuses to be interrupted should remain assertEquals(1, runningThreads.size()); assertEquals(1, caughtExceptions.size()); @@ -423,7 +417,13 @@ void testNoInterruptOrphanThreadFailure() { } private void startSleepingThread(final List caughtExceptions, final boolean ignoreInterrupt) { - final ExecutorService executorService = Executors.newFixedThreadPool(1); + final ExecutorService executorService = Executors.newFixedThreadPool(1, r -> { + // Create a thread that should be identified as orphaned if still running during shutdown + final Thread thread = new Thread(r); + thread.setName("sleeping-thread"); + thread.setDaemon(false); + return thread; + }); executorService.submit(() -> { for (int tries = 0; tries < 3; tries++) { try { diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/util/concurrent/ConcurrentStreamConsumerTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/util/concurrent/ConcurrentStreamConsumerTest.java new file mode 100644 index 000000000000..db9f92492d88 --- /dev/null +++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/util/concurrent/ConcurrentStreamConsumerTest.java @@ -0,0 +1,126 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.util.concurrent; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.databind.node.IntNode; +import com.google.common.collect.Lists; +import io.airbyte.commons.util.AutoCloseableIterator; +import io.airbyte.commons.util.AutoCloseableIterators; +import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; +import io.airbyte.protocol.models.v0.AirbyteMessage; +import io.airbyte.protocol.models.v0.AirbyteRecordMessage; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; +import org.junit.jupiter.api.Test; + +/** + * Test suite for the {@link ConcurrentStreamConsumer} class. + */ +class ConcurrentStreamConsumerTest { + + private static final String NAME = "name"; + private static final String NAMESPACE = "namespace"; + + @Test + void testAcceptMessage() { + final AutoCloseableIterator stream = mock(AutoCloseableIterator.class); + final Consumer> streamConsumer = mock(Consumer.class); + + final ConcurrentStreamConsumer concurrentStreamConsumer = new ConcurrentStreamConsumer(streamConsumer, 1); + + assertDoesNotThrow(() -> concurrentStreamConsumer.accept(List.of(stream))); + + verify(streamConsumer, times(1)).accept(stream); + } + + @Test + void testAcceptMessageWithException() { + final AutoCloseableIterator stream = mock(AutoCloseableIterator.class); + final Consumer> streamConsumer = mock(Consumer.class); + final Exception e = new NullPointerException("test"); + + doThrow(e).when(streamConsumer).accept(any()); + + final ConcurrentStreamConsumer concurrentStreamConsumer = new ConcurrentStreamConsumer(streamConsumer, 1); + + assertDoesNotThrow(() -> concurrentStreamConsumer.accept(List.of(stream))); + + verify(streamConsumer, times(1)).accept(stream); + assertTrue(concurrentStreamConsumer.getException().isPresent()); + assertEquals(e, concurrentStreamConsumer.getException().get()); + assertEquals(1, concurrentStreamConsumer.getExceptions().size()); + assertTrue(concurrentStreamConsumer.getExceptions().contains(e)); + } + + @Test + void testAcceptMessageWithMultipleExceptions() { + final AutoCloseableIterator stream1 = mock(AutoCloseableIterator.class); + final AutoCloseableIterator stream2 = mock(AutoCloseableIterator.class); + final AutoCloseableIterator stream3 = mock(AutoCloseableIterator.class); + final Consumer> streamConsumer = mock(Consumer.class); + final Exception e1 = new NullPointerException("test1"); + final Exception e2 = new NullPointerException("test2"); + final Exception e3 = new NullPointerException("test3"); + + doThrow(e1).when(streamConsumer).accept(stream1); + doThrow(e2).when(streamConsumer).accept(stream2); + doThrow(e3).when(streamConsumer).accept(stream3); + + final ConcurrentStreamConsumer concurrentStreamConsumer = new ConcurrentStreamConsumer(streamConsumer, 1); + + assertDoesNotThrow(() -> concurrentStreamConsumer.accept(List.of(stream1, stream2, stream3))); + + verify(streamConsumer, times(3)).accept(any(AutoCloseableIterator.class)); + assertTrue(concurrentStreamConsumer.getException().isPresent()); + assertEquals(e1, concurrentStreamConsumer.getException().get()); + assertEquals(3, concurrentStreamConsumer.getExceptions().size()); + assertTrue(concurrentStreamConsumer.getExceptions().contains(e1)); + assertTrue(concurrentStreamConsumer.getExceptions().contains(e2)); + assertTrue(concurrentStreamConsumer.getExceptions().contains(e3)); + } + + @Test + void testMoreStreamsThanAvailableThreads() { + final List baseData = List.of(2, 4, 6, 8, 10, 12, 14, 16, 18, 20); + final List> streams = new ArrayList<>(); + for (int i = 0; i < 20; i++) { + final AirbyteStreamNameNamespacePair airbyteStreamNameNamespacePair = + new AirbyteStreamNameNamespacePair(String.format("%s_%d", NAME, i), NAMESPACE); + final List messages = new ArrayList<>(); + for (int d : baseData) { + final AirbyteMessage airbyteMessage = mock(AirbyteMessage.class); + final AirbyteRecordMessage recordMessage = mock(AirbyteRecordMessage.class); + when(recordMessage.getData()).thenReturn(new IntNode(d * i)); + when(airbyteMessage.getRecord()).thenReturn(recordMessage); + messages.add(airbyteMessage); + } + streams.add(AutoCloseableIterators.fromIterator(messages.iterator(), airbyteStreamNameNamespacePair)); + } + final Consumer> streamConsumer = mock(Consumer.class); + + final ConcurrentStreamConsumer concurrentStreamConsumer = new ConcurrentStreamConsumer(streamConsumer, streams.size()); + final Integer partitionSize = concurrentStreamConsumer.getParallelism(); + final List>> partitions = Lists.partition(streams.stream().toList(), + partitionSize); + + for (final List> partition : partitions) { + assertDoesNotThrow(() -> concurrentStreamConsumer.accept(partition)); + } + + verify(streamConsumer, times(streams.size())).accept(any(AutoCloseableIterator.class)); + } + +} diff --git a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java index 03855a76c06e..74fc4212e71b 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java +++ b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java @@ -110,7 +110,8 @@ protected AutoCloseableIterator queryTableFullRefresh(final JdbcDataba final SyncMode syncMode, final Optional cursorField) { LOGGER.info("Queueing query for table: {}", tableName); - // This corresponds to the initial sync for in INCREMENTAL_MODE, where the ordering of the records matters + // This corresponds to the initial sync for in INCREMENTAL_MODE, where the ordering of the records + // matters // as intermediate state messages are emitted (if the connector emits intermediate state). if (syncMode.equals(SyncMode.INCREMENTAL) && getStateEmissionFrequency() > 0) { final String quotedCursorField = enquoteIdentifier(cursorField.get(), getQuoteString()); diff --git a/airbyte-integrations/connectors/source-mysql/build.gradle b/airbyte-integrations/connectors/source-mysql/build.gradle index c646317961cf..31bd84a64635 100644 --- a/airbyte-integrations/connectors/source-mysql/build.gradle +++ b/airbyte-integrations/connectors/source-mysql/build.gradle @@ -40,3 +40,7 @@ dependencies { performanceTestJavaImplementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) } + + + + diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java index 6da5dcd8cd18..f1803e5a6c15 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java @@ -46,9 +46,12 @@ import io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils; import io.airbyte.integrations.source.jdbc.JdbcSSLConnectionUtils.SslMode; import io.airbyte.integrations.source.mysql.helpers.CdcConfigurationHelper; +import io.airbyte.integrations.source.relationaldb.DbSourceDiscoverUtil; import io.airbyte.integrations.source.relationaldb.TableInfo; import io.airbyte.integrations.source.relationaldb.models.CdcState; +import io.airbyte.integrations.source.relationaldb.state.StateGeneratorUtils; import io.airbyte.integrations.source.relationaldb.state.StateManager; +import io.airbyte.integrations.source.relationaldb.state.StateManagerFactory; import io.airbyte.integrations.util.HostPortResolver; import io.airbyte.protocol.models.CommonField; import io.airbyte.protocol.models.v0.AirbyteCatalog; @@ -62,6 +65,7 @@ import java.time.Duration; import java.time.Instant; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -70,8 +74,10 @@ import java.util.Optional; import java.util.OptionalInt; import java.util.Set; +import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.sql.DataSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -183,6 +189,46 @@ public AirbyteCatalog discover(final JsonNode config) throws Exception { return catalog; } + @Override + public Collection> readStreams(final JsonNode config, + final ConfiguredAirbyteCatalog catalog, + final JsonNode state) + throws Exception { + final AirbyteStateType supportedStateType = getSupportedStateType(config); + final StateManager stateManager = + StateManagerFactory.createStateManager(supportedStateType, + StateGeneratorUtils.deserializeInitialState(state, featureFlags.useStreamCapableState(), supportedStateType), catalog); + final Instant emittedAt = Instant.now(); + + final JdbcDatabase database = createDatabase(config); + + logPreSyncDebugData(database, catalog); + + final Map>> fullyQualifiedTableNameToInfo = + discoverWithoutSystemTables(database) + .stream() + .collect(Collectors.toMap(t -> String.format("%s.%s", t.getNameSpace(), t.getName()), + Function + .identity())); + + validateCursorFieldForIncrementalTables(fullyQualifiedTableNameToInfo, catalog, database); + + DbSourceDiscoverUtil.logSourceSchemaChange(fullyQualifiedTableNameToInfo, catalog, this::getAirbyteType); + + final List> incrementalIterators = + getIncrementalIterators(database, catalog, fullyQualifiedTableNameToInfo, stateManager, + emittedAt); + final List> fullRefreshIterators = + getFullRefreshIterators(database, catalog, fullyQualifiedTableNameToInfo, stateManager, + emittedAt); + final List> iteratorList = Stream + .of(incrementalIterators, fullRefreshIterators) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + + return iteratorList; + } + @Override public JsonNode toDatabaseConfig(final JsonNode config) { final String encodedDatabaseName = HostPortResolver.encodeValue(config.get(JdbcUtils.DATABASE_KEY).asText()); diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java index b805587ba2d9..204267aa0304 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java @@ -176,10 +176,10 @@ public AutoCloseableIterator read(final JsonNode config, }); } - private void validateCursorFieldForIncrementalTables( - final Map>> tableNameToTable, - final ConfiguredAirbyteCatalog catalog, - final Database database) + protected void validateCursorFieldForIncrementalTables( + final Map>> tableNameToTable, + final ConfiguredAirbyteCatalog catalog, + final Database database) throws SQLException { final List tablesWithInvalidCursor = new ArrayList<>(); for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) { @@ -250,8 +250,7 @@ protected void estimateFullRefreshSyncSize(final Database database, /* no-op */ } - private List>> discoverWithoutSystemTables( - final Database database) + protected List>> discoverWithoutSystemTables(final Database database) throws Exception { final Set systemNameSpaces = getExcludedInternalNameSpaces(); final Set systemViews = getExcludedViews(); @@ -262,12 +261,12 @@ private List>> discoverWithoutSystemTables( Collectors.toList())); } - private List> getFullRefreshIterators( - final Database database, - final ConfiguredAirbyteCatalog catalog, - final Map>> tableNameToTable, - final StateManager stateManager, - final Instant emittedAt) { + protected List> getFullRefreshIterators( + final Database database, + final ConfiguredAirbyteCatalog catalog, + final Map>> tableNameToTable, + final StateManager stateManager, + final Instant emittedAt) { return getSelectedIterators( database, catalog, diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/CursorManager.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/CursorManager.java index 21da857e615f..164c7f8091ee 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/CursorManager.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/CursorManager.java @@ -17,6 +17,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -106,7 +107,7 @@ protected Map createCursorInfoMap( .collect(Collectors.toSet()); allStreamNames.addAll(streamSupplier.get().stream().map(namespacePairFunction).filter(Objects::nonNull).collect(Collectors.toSet())); - final Map localMap = new HashMap<>(); + final Map localMap = new ConcurrentHashMap<>(); final Map pairToState = streamSupplier.get() .stream() .collect(Collectors.toMap(namespacePairFunction, Function.identity()));