From 6bf0ae71235faf6d2c74c4f1695c78743c1c2792 Mon Sep 17 00:00:00 2001 From: Stephane Geneix <147216312+stephane-airbyte@users.noreply.github.com> Date: Wed, 7 Feb 2024 14:00:41 -0800 Subject: [PATCH] make exclusive containers first class citizens (#34892) Co-authored-by: Marius Posta --- airbyte-cdk/java/airbyte-cdk/README.md | 1 + .../commons/logging/LoggingHelper.java | 1 + .../io/airbyte/commons/logging/MdcScope.java | 13 +- .../src/main/resources/version.properties | 2 +- .../cdk/testutils/ContainerFactory.java | 164 +++++++++--------- .../connectors/source-postgres/build.gradle | 4 +- .../postgres/PostgresContainerFactory.java | 9 +- docs/integrations/sources/postgres.md | 1 + 8 files changed, 102 insertions(+), 93 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index 8b6a3fa1b85a..a0dd938d8164 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -166,6 +166,7 @@ MavenLocal debugging steps: | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.16.6 | 2024-02-07 | [\#34892](https://github.com/airbytehq/airbyte/pull/34892) | Improved testcontainers logging and support for unshared containers. | | 0.16.5 | 2024-02-07 | [\#34948](https://github.com/airbytehq/airbyte/pull/34948) | Fix source state stats counting logic | | 0.16.4 | 2024-02-01 | [\#34727](https://github.com/airbytehq/airbyte/pull/34727) | Add future based stdout consumer in BaseTypingDedupingTest | | 0.16.3 | 2024-01-30 | [\#34669](https://github.com/airbytehq/airbyte/pull/34669) | Fix org.apache.logging.log4j:log4j-slf4j-impl version conflicts. | diff --git a/airbyte-cdk/java/airbyte-cdk/airbyte-commons/src/main/java/io/airbyte/commons/logging/LoggingHelper.java b/airbyte-cdk/java/airbyte-cdk/airbyte-commons/src/main/java/io/airbyte/commons/logging/LoggingHelper.java index 8250449bc77f..e351ca2eff44 100644 --- a/airbyte-cdk/java/airbyte-cdk/airbyte-commons/src/main/java/io/airbyte/commons/logging/LoggingHelper.java +++ b/airbyte-cdk/java/airbyte-cdk/airbyte-commons/src/main/java/io/airbyte/commons/logging/LoggingHelper.java @@ -22,6 +22,7 @@ public enum Color { YELLOW_BACKGROUND("\u001b[43m"), // destination GREEN_BACKGROUND("\u001b[42m"), // normalization CYAN_BACKGROUND("\u001b[46m"), // container runner + RED_BACKGROUND("\u001b[41m"), // testcontainers PURPLE_BACKGROUND("\u001b[45m"); // dbt private final String ansi; diff --git a/airbyte-cdk/java/airbyte-cdk/airbyte-commons/src/main/java/io/airbyte/commons/logging/MdcScope.java b/airbyte-cdk/java/airbyte-cdk/airbyte-commons/src/main/java/io/airbyte/commons/logging/MdcScope.java index 21264d559ef3..080ebae80a69 100644 --- a/airbyte-cdk/java/airbyte-cdk/airbyte-commons/src/main/java/io/airbyte/commons/logging/MdcScope.java +++ b/airbyte-cdk/java/airbyte-cdk/airbyte-commons/src/main/java/io/airbyte/commons/logging/MdcScope.java @@ -8,6 +8,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.function.BiConsumer; import org.slf4j.MDC; /** @@ -70,22 +71,24 @@ public Builder setSimple(final boolean simple) { return this; } - public MdcScope build() { - final Map extraMdcEntries = new HashMap<>(); - + public void produceMappings(final BiConsumer mdcConsumer) { maybeLogPrefix.ifPresent(logPrefix -> { final String potentiallyColoredLog = maybePrefixColor .map(color -> LoggingHelper.applyColor(color, logPrefix)) .orElse(logPrefix); - extraMdcEntries.put(LoggingHelper.LOG_SOURCE_MDC_KEY, potentiallyColoredLog); + mdcConsumer.accept(LoggingHelper.LOG_SOURCE_MDC_KEY, potentiallyColoredLog); if (simple) { // outputs much less information for this line. see log4j2.xml to see exactly what this does - extraMdcEntries.put("simple", "true"); + mdcConsumer.accept("simple", "true"); } }); + } + public MdcScope build() { + final Map extraMdcEntries = new HashMap<>(); + produceMappings(extraMdcEntries::put); return new MdcScope(extraMdcEntries); } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index b7e05db2aecd..f385278d30c4 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.16.5 +version=0.16.6 diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/testutils/ContainerFactory.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/testutils/ContainerFactory.java index 4735716dc05e..932a7dda3ac0 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/testutils/ContainerFactory.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/java/io/airbyte/cdk/testutils/ContainerFactory.java @@ -4,115 +4,123 @@ package io.airbyte.cdk.testutils; +import io.airbyte.commons.logging.LoggingHelper; +import io.airbyte.commons.logging.MdcScope; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Supplier; import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.JdbcDatabaseContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.utility.DockerImageName; /** - * ContainerFactory is the companion interface to {@link TestDatabase} for providing it with - * suitable testcontainer instances. + * ContainerFactory is the companion to {@link TestDatabase} and provides it with suitable + * testcontainer instances. */ -public interface ContainerFactory> { +public abstract class ContainerFactory> { + + static private final Logger LOGGER = LoggerFactory.getLogger(ContainerFactory.class); + + private record ContainerKey(Class clazz, DockerImageName imageName, List methods) {}; + + private static class ContainerOrException { + + private final Supplier> containerSupplier; + private volatile RuntimeException _exception = null; + private volatile GenericContainer _container = null; + + ContainerOrException(Supplier> containerSupplier) { + this.containerSupplier = containerSupplier; + } + + GenericContainer container() { + if (_exception == null && _container == null) { + synchronized (this) { + if (_container == null && _exception == null) { + try { + _container = containerSupplier.get(); + if (_container == null) { + throw new IllegalStateException("testcontainer instance was not constructed"); + } + } catch (RuntimeException e) { + _exception = e; + } + } + } + } + if (_exception != null) { + throw _exception; + } + return _container; + } + + } + + private static final ConcurrentMap SHARED_CONTAINERS = new ConcurrentHashMap<>(); + + private static final MdcScope.Builder TESTCONTAINER_LOG_MDC_BUILDER = new MdcScope.Builder() + .setLogPrefix("testcontainer") + .setPrefixColor(LoggingHelper.Color.RED_BACKGROUND); /** * Creates a new, unshared testcontainer instance. This usually wraps the default constructor for * the testcontainer type. */ - C createNewContainer(DockerImageName imageName); - - /** - * Returns the class object of the testcontainer. - */ - Class getContainerClass(); + protected abstract C createNewContainer(DockerImageName imageName); /** * Returns a shared instance of the testcontainer. */ - default C shared(String imageName, String... methods) { - final String mapKey = Stream.concat( - Stream.of(imageName, this.getClass().getCanonicalName()), - Stream.of(methods)) - .collect(Collectors.joining("+")); - return Singleton.getOrCreate(mapKey, this); + @SuppressWarnings("unchecked") + public final C shared(String imageName, String... methods) { + final var containerKey = new ContainerKey(getClass(), DockerImageName.parse(imageName), Stream.of(methods).toList()); + // We deliberately avoid creating the container itself eagerly during the evaluation of the map + // value. + // Container creation can be exceedingly slow. + // Furthermore, we need to handle exceptions raised during container creation. + ContainerOrException containerOrError = SHARED_CONTAINERS.computeIfAbsent(containerKey, + key -> new ContainerOrException(() -> createAndStartContainer(key.imageName(), key.methods()))); + // Instead, the container creation (if applicable) is deferred to here. + return (C) containerOrError.container(); } /** - * This class is exclusively used by {@link #shared(String, String...)}. It wraps a specific shared - * testcontainer instance, which is created exactly once. + * Returns an exclusive instance of the testcontainer. */ - class Singleton> { - - static private final Logger LOGGER = LoggerFactory.getLogger(Singleton.class); - static private final ConcurrentHashMap> LAZY = new ConcurrentHashMap<>(); - - @SuppressWarnings("unchecked") - static private > C getOrCreate(String mapKey, ContainerFactory factory) { - final Singleton singleton = LAZY.computeIfAbsent(mapKey, Singleton::new); - return ((Singleton) singleton).getOrCreate(factory); - } - - final private String imageName; - final private List methodNames; - - private C sharedContainer; - private RuntimeException containerCreationError; - - private Singleton(String imageNamePlusMethods) { - final String[] parts = imageNamePlusMethods.split("\\+"); - this.imageName = parts[0]; - this.methodNames = Arrays.stream(parts).skip(2).toList(); - } + @SuppressWarnings("unchecked") + public final C exclusive(String imageName, String... methods) { + return (C) createAndStartContainer(DockerImageName.parse(imageName), Stream.of(methods).toList()); + } - private synchronized C getOrCreate(ContainerFactory factory) { - if (sharedContainer == null && containerCreationError == null) { - try { - create(imageName, factory, methodNames); - } catch (RuntimeException e) { - sharedContainer = null; - containerCreationError = e; - } + private GenericContainer createAndStartContainer(DockerImageName imageName, List methodNames) { + LOGGER.info("Creating new shared container based on {} with {}.", imageName, methodNames); + try { + GenericContainer container = createNewContainer(imageName); + final var methods = new ArrayList(); + for (String methodName : methodNames) { + methods.add(getClass().getMethod(methodName, container.getClass())); } - if (containerCreationError != null) { - throw new RuntimeException( - "Error during container creation for imageName=" + imageName - + ", factory=" + factory.getClass().getName() - + ", methods=" + methodNames, - containerCreationError); - } - return sharedContainer; - } - - private void create(String imageName, ContainerFactory factory, List methodNames) { - LOGGER.info("Creating new shared container based on {} with {}.", imageName, methodNames); - try { - final var parsed = DockerImageName.parse(imageName); - final var methods = new ArrayList(); - for (String methodName : methodNames) { - methods.add(factory.getClass().getMethod(methodName, factory.getContainerClass())); - } - sharedContainer = factory.createNewContainer(parsed); - sharedContainer.withLogConsumer(new Slf4jLogConsumer(LOGGER)); - for (Method method : methods) { - LOGGER.info("Calling {} in {} on new shared container based on {}.", - method.getName(), factory.getClass().getName(), imageName); - method.invoke(factory, sharedContainer); - } - sharedContainer.start(); - } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { - throw new RuntimeException(e); + final var logConsumer = new Slf4jLogConsumer(LOGGER); + TESTCONTAINER_LOG_MDC_BUILDER.produceMappings(logConsumer::withMdc); + container.withLogConsumer(logConsumer); + for (Method method : methods) { + LOGGER.info("Calling {} in {} on new shared container based on {}.", + method.getName(), getClass().getName(), imageName); + method.invoke(this, container); } + container.start(); + return container; + } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { + throw new RuntimeException(e); } - } } diff --git a/airbyte-integrations/connectors/source-postgres/build.gradle b/airbyte-integrations/connectors/source-postgres/build.gradle index 55f7ec9a3b05..3a75fc17ae82 100644 --- a/airbyte-integrations/connectors/source-postgres/build.gradle +++ b/airbyte-integrations/connectors/source-postgres/build.gradle @@ -13,9 +13,9 @@ java { } airbyteJavaConnector { - cdkVersionRequired = '0.16.5' + cdkVersionRequired = '0.16.6' features = ['db-sources'] - useLocalCdk = false + useLocalCdk = true } diff --git a/airbyte-integrations/connectors/source-postgres/src/testFixtures/java/io/airbyte/integrations/source/postgres/PostgresContainerFactory.java b/airbyte-integrations/connectors/source-postgres/src/testFixtures/java/io/airbyte/integrations/source/postgres/PostgresContainerFactory.java index b92c319d9eec..625af2d4aa1a 100644 --- a/airbyte-integrations/connectors/source-postgres/src/testFixtures/java/io/airbyte/integrations/source/postgres/PostgresContainerFactory.java +++ b/airbyte-integrations/connectors/source-postgres/src/testFixtures/java/io/airbyte/integrations/source/postgres/PostgresContainerFactory.java @@ -12,19 +12,14 @@ import org.testcontainers.utility.DockerImageName; import org.testcontainers.utility.MountableFile; -public class PostgresContainerFactory implements ContainerFactory> { +public class PostgresContainerFactory extends ContainerFactory> { @Override - public PostgreSQLContainer createNewContainer(DockerImageName imageName) { + protected PostgreSQLContainer createNewContainer(DockerImageName imageName) { return new PostgreSQLContainer<>(imageName.asCompatibleSubstituteFor("postgres")); } - @Override - public Class getContainerClass() { - return PostgreSQLContainer.class; - } - /** * Apply the postgresql.conf file that we've packaged as a resource. */ diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index b1c0e42651b6..8f514c06e0a5 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -292,6 +292,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp | Version | Date | Pull Request | Subject | |---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.3.6 | 2024-02-07 | [34892](https://github.com/airbytehq/airbyte/pull/34892) | Adopt CDK v0.16.6 | | 3.3.5 | 2024-02-07 | [34948](https://github.com/airbytehq/airbyte/pull/34948) | Adopt CDK v0.16.5 | | 3.3.4 | 2024-01-31 | [34723](https://github.com/airbytehq/airbyte/pull/34723) | Adopt CDK v0.16.3 | | 3.3.3 | 2024-01-26 | [34573](https://github.com/airbytehq/airbyte/pull/34573) | Adopt CDK v0.16.0 |