Skip to content

Commit

Permalink
make exclusive containers first class citizens (airbytehq#34892)
Browse files Browse the repository at this point in the history
Co-authored-by: Marius Posta <marius@airbyte.io>
  • Loading branch information
2 people authored and jatinyadav-cc committed Feb 21, 2024
1 parent 3496cb1 commit 6bf0ae7
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 93 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.16.6 | 2024-02-07 | [\#34892](https://github.com/airbytehq/airbyte/pull/34892) | Improved testcontainers logging and support for unshared containers. |
| 0.16.5 | 2024-02-07 | [\#34948](https://github.com/airbytehq/airbyte/pull/34948) | Fix source state stats counting logic |
| 0.16.4 | 2024-02-01 | [\#34727](https://github.com/airbytehq/airbyte/pull/34727) | Add future based stdout consumer in BaseTypingDedupingTest |
| 0.16.3 | 2024-01-30 | [\#34669](https://github.com/airbytehq/airbyte/pull/34669) | Fix org.apache.logging.log4j:log4j-slf4j-impl version conflicts. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public enum Color {
YELLOW_BACKGROUND("\u001b[43m"), // destination
GREEN_BACKGROUND("\u001b[42m"), // normalization
CYAN_BACKGROUND("\u001b[46m"), // container runner
RED_BACKGROUND("\u001b[41m"), // testcontainers
PURPLE_BACKGROUND("\u001b[45m"); // dbt

private final String ansi;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;
import org.slf4j.MDC;

/**
Expand Down Expand Up @@ -70,22 +71,24 @@ public Builder setSimple(final boolean simple) {
return this;
}

public MdcScope build() {
final Map<String, String> extraMdcEntries = new HashMap<>();

public void produceMappings(final BiConsumer<String, String> mdcConsumer) {
maybeLogPrefix.ifPresent(logPrefix -> {
final String potentiallyColoredLog = maybePrefixColor
.map(color -> LoggingHelper.applyColor(color, logPrefix))
.orElse(logPrefix);

extraMdcEntries.put(LoggingHelper.LOG_SOURCE_MDC_KEY, potentiallyColoredLog);
mdcConsumer.accept(LoggingHelper.LOG_SOURCE_MDC_KEY, potentiallyColoredLog);

if (simple) {
// outputs much less information for this line. see log4j2.xml to see exactly what this does
extraMdcEntries.put("simple", "true");
mdcConsumer.accept("simple", "true");
}
});
}

public MdcScope build() {
final Map<String, String> extraMdcEntries = new HashMap<>();
produceMappings(extraMdcEntries::put);
return new MdcScope(extraMdcEntries);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.16.5
version=0.16.6
Original file line number Diff line number Diff line change
Expand Up @@ -4,115 +4,123 @@

package io.airbyte.cdk.testutils;

import io.airbyte.commons.logging.LoggingHelper;
import io.airbyte.commons.logging.MdcScope;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.JdbcDatabaseContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;

/**
* ContainerFactory is the companion interface to {@link TestDatabase} for providing it with
* suitable testcontainer instances.
* ContainerFactory is the companion to {@link TestDatabase} and provides it with suitable
* testcontainer instances.
*/
public interface ContainerFactory<C extends JdbcDatabaseContainer<?>> {
public abstract class ContainerFactory<C extends JdbcDatabaseContainer<?>> {

static private final Logger LOGGER = LoggerFactory.getLogger(ContainerFactory.class);

private record ContainerKey(Class<? extends ContainerFactory> clazz, DockerImageName imageName, List<String> methods) {};

private static class ContainerOrException {

private final Supplier<GenericContainer<?>> containerSupplier;
private volatile RuntimeException _exception = null;
private volatile GenericContainer<?> _container = null;

ContainerOrException(Supplier<GenericContainer<?>> containerSupplier) {
this.containerSupplier = containerSupplier;
}

GenericContainer<?> container() {
if (_exception == null && _container == null) {
synchronized (this) {
if (_container == null && _exception == null) {
try {
_container = containerSupplier.get();
if (_container == null) {
throw new IllegalStateException("testcontainer instance was not constructed");
}
} catch (RuntimeException e) {
_exception = e;
}
}
}
}
if (_exception != null) {
throw _exception;
}
return _container;
}

}

private static final ConcurrentMap<ContainerKey, ContainerOrException> SHARED_CONTAINERS = new ConcurrentHashMap<>();

private static final MdcScope.Builder TESTCONTAINER_LOG_MDC_BUILDER = new MdcScope.Builder()
.setLogPrefix("testcontainer")
.setPrefixColor(LoggingHelper.Color.RED_BACKGROUND);

/**
* Creates a new, unshared testcontainer instance. This usually wraps the default constructor for
* the testcontainer type.
*/
C createNewContainer(DockerImageName imageName);

/**
* Returns the class object of the testcontainer.
*/
Class<?> getContainerClass();
protected abstract C createNewContainer(DockerImageName imageName);

/**
* Returns a shared instance of the testcontainer.
*/
default C shared(String imageName, String... methods) {
final String mapKey = Stream.concat(
Stream.of(imageName, this.getClass().getCanonicalName()),
Stream.of(methods))
.collect(Collectors.joining("+"));
return Singleton.getOrCreate(mapKey, this);
@SuppressWarnings("unchecked")
public final C shared(String imageName, String... methods) {
final var containerKey = new ContainerKey(getClass(), DockerImageName.parse(imageName), Stream.of(methods).toList());
// We deliberately avoid creating the container itself eagerly during the evaluation of the map
// value.
// Container creation can be exceedingly slow.
// Furthermore, we need to handle exceptions raised during container creation.
ContainerOrException containerOrError = SHARED_CONTAINERS.computeIfAbsent(containerKey,
key -> new ContainerOrException(() -> createAndStartContainer(key.imageName(), key.methods())));
// Instead, the container creation (if applicable) is deferred to here.
return (C) containerOrError.container();
}

/**
* This class is exclusively used by {@link #shared(String, String...)}. It wraps a specific shared
* testcontainer instance, which is created exactly once.
* Returns an exclusive instance of the testcontainer.
*/
class Singleton<C extends JdbcDatabaseContainer<?>> {

static private final Logger LOGGER = LoggerFactory.getLogger(Singleton.class);
static private final ConcurrentHashMap<String, Singleton<?>> LAZY = new ConcurrentHashMap<>();

@SuppressWarnings("unchecked")
static private <C extends JdbcDatabaseContainer<?>> C getOrCreate(String mapKey, ContainerFactory<C> factory) {
final Singleton<?> singleton = LAZY.computeIfAbsent(mapKey, Singleton<C>::new);
return ((Singleton<C>) singleton).getOrCreate(factory);
}

final private String imageName;
final private List<String> methodNames;

private C sharedContainer;
private RuntimeException containerCreationError;

private Singleton(String imageNamePlusMethods) {
final String[] parts = imageNamePlusMethods.split("\\+");
this.imageName = parts[0];
this.methodNames = Arrays.stream(parts).skip(2).toList();
}
@SuppressWarnings("unchecked")
public final C exclusive(String imageName, String... methods) {
return (C) createAndStartContainer(DockerImageName.parse(imageName), Stream.of(methods).toList());
}

private synchronized C getOrCreate(ContainerFactory<C> factory) {
if (sharedContainer == null && containerCreationError == null) {
try {
create(imageName, factory, methodNames);
} catch (RuntimeException e) {
sharedContainer = null;
containerCreationError = e;
}
private GenericContainer<?> createAndStartContainer(DockerImageName imageName, List<String> methodNames) {
LOGGER.info("Creating new shared container based on {} with {}.", imageName, methodNames);
try {
GenericContainer<?> container = createNewContainer(imageName);
final var methods = new ArrayList<Method>();
for (String methodName : methodNames) {
methods.add(getClass().getMethod(methodName, container.getClass()));
}
if (containerCreationError != null) {
throw new RuntimeException(
"Error during container creation for imageName=" + imageName
+ ", factory=" + factory.getClass().getName()
+ ", methods=" + methodNames,
containerCreationError);
}
return sharedContainer;
}

private void create(String imageName, ContainerFactory<C> factory, List<String> methodNames) {
LOGGER.info("Creating new shared container based on {} with {}.", imageName, methodNames);
try {
final var parsed = DockerImageName.parse(imageName);
final var methods = new ArrayList<Method>();
for (String methodName : methodNames) {
methods.add(factory.getClass().getMethod(methodName, factory.getContainerClass()));
}
sharedContainer = factory.createNewContainer(parsed);
sharedContainer.withLogConsumer(new Slf4jLogConsumer(LOGGER));
for (Method method : methods) {
LOGGER.info("Calling {} in {} on new shared container based on {}.",
method.getName(), factory.getClass().getName(), imageName);
method.invoke(factory, sharedContainer);
}
sharedContainer.start();
} catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
throw new RuntimeException(e);
final var logConsumer = new Slf4jLogConsumer(LOGGER);
TESTCONTAINER_LOG_MDC_BUILDER.produceMappings(logConsumer::withMdc);
container.withLogConsumer(logConsumer);
for (Method method : methods) {
LOGGER.info("Calling {} in {} on new shared container based on {}.",
method.getName(), getClass().getName(), imageName);
method.invoke(this, container);
}
container.start();
return container;
} catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
throw new RuntimeException(e);
}

}

}
4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-postgres/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ java {
}

airbyteJavaConnector {
cdkVersionRequired = '0.16.5'
cdkVersionRequired = '0.16.6'
features = ['db-sources']
useLocalCdk = false
useLocalCdk = true
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,14 @@
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.MountableFile;

public class PostgresContainerFactory implements ContainerFactory<PostgreSQLContainer<?>> {
public class PostgresContainerFactory extends ContainerFactory<PostgreSQLContainer<?>> {

@Override
public PostgreSQLContainer<?> createNewContainer(DockerImageName imageName) {
protected PostgreSQLContainer<?> createNewContainer(DockerImageName imageName) {
return new PostgreSQLContainer<>(imageName.asCompatibleSubstituteFor("postgres"));

}

@Override
public Class<?> getContainerClass() {
return PostgreSQLContainer.class;
}

/**
* Apply the postgresql.conf file that we've packaged as a resource.
*/
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp

| Version | Date | Pull Request | Subject |
|---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.3.6 | 2024-02-07 | [34892](https://github.com/airbytehq/airbyte/pull/34892) | Adopt CDK v0.16.6 |
| 3.3.5 | 2024-02-07 | [34948](https://github.com/airbytehq/airbyte/pull/34948) | Adopt CDK v0.16.5 |
| 3.3.4 | 2024-01-31 | [34723](https://github.com/airbytehq/airbyte/pull/34723) | Adopt CDK v0.16.3 |
| 3.3.3 | 2024-01-26 | [34573](https://github.com/airbytehq/airbyte/pull/34573) | Adopt CDK v0.16.0 |
Expand Down

0 comments on commit 6bf0ae7

Please sign in to comment.