Skip to content

Commit

Permalink
Skip ConfigPersistence for ActorCatalog operations
Browse files Browse the repository at this point in the history
  • Loading branch information
malikdiarra committed Feb 17, 2022
1 parent fe1eb8d commit 9a5b5df
Show file tree
Hide file tree
Showing 8 changed files with 212 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void load() throws Exception {

final ConfigPersistence configPersistence = DatabaseConfigPersistence.createWithValidation(configDatabase);
final ConfigRepository configRepository =
new ConfigRepository(configPersistence, null, Optional.empty(), Optional.empty());
new ConfigRepository(configPersistence, null, Optional.empty(), Optional.empty(), configDatabase);

createWorkspaceIfNoneExists(configRepository);
LOGGER.info("Default workspace created..");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

package io.airbyte.config.persistence;

import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_CATALOG;
import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_CATALOG_FETCH_EVENT;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Charsets;
import com.google.common.hash.HashFunction;
Expand All @@ -30,11 +33,14 @@
import io.airbyte.config.persistence.split_secrets.SecretsHelpers;
import io.airbyte.config.persistence.split_secrets.SecretsHydrator;
import io.airbyte.config.persistence.split_secrets.SplitSecretConfig;
import io.airbyte.db.Database;
import io.airbyte.db.ExceptionWrappingDatabase;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
Expand All @@ -47,6 +53,11 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jooq.JSONB;
import org.jooq.Record;
import org.jooq.Record1;
import org.jooq.Record2;
import org.jooq.Result;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -61,15 +72,18 @@ public class ConfigRepository {
private final SecretsHydrator secretsHydrator;
private final Optional<SecretPersistence> longLivedSecretPersistence;
private final Optional<SecretPersistence> ephemeralSecretPersistence;
private final ExceptionWrappingDatabase database;

public ConfigRepository(final ConfigPersistence persistence,
final SecretsHydrator secretsHydrator,
final Optional<SecretPersistence> longLivedSecretPersistence,
final Optional<SecretPersistence> ephemeralSecretPersistence) {
final Optional<SecretPersistence> ephemeralSecretPersistence,
final Database database) {
this.persistence = persistence;
this.secretsHydrator = secretsHydrator;
this.longLivedSecretPersistence = longLivedSecretPersistence;
this.ephemeralSecretPersistence = ephemeralSecretPersistence;
this.database = new ExceptionWrappingDatabase(database);
}

public StandardWorkspace getStandardWorkspace(final UUID workspaceId, final boolean includeTombstone)
Expand Down Expand Up @@ -615,35 +629,86 @@ public List<ActorCatalog> listActorCatalogs()
return actorCatalogs;
}

public void writeCatalog(final AirbyteCatalog catalog,
final UUID sourceId,
final String configurationHash,
final String connectorVersion)
throws JsonValidationException, IOException {
public Map<UUID, AirbyteCatalog> findCatalogByHash(final String catalogHash) throws IOException {
final Result<Record2<UUID, JSONB>> records = database.transaction(ctx -> ctx.select(ACTOR_CATALOG.ID, ACTOR_CATALOG.CATALOG)
.from(ACTOR_CATALOG)
.where(ACTOR_CATALOG.CATALOG_HASH.eq(catalogHash))).fetch();

final Map<UUID, AirbyteCatalog> result = new HashMap<>();
for (final Record record : records) {
final AirbyteCatalog catalog = Jsons.deserialize(
record.get(ACTOR_CATALOG.CATALOG).toString(), AirbyteCatalog.class);
result.put(record.get(ACTOR_CATALOG.ID), catalog);
}
return result;
}

public Optional<AirbyteCatalog> getActorCatalog(final UUID actorId,
final String actorVersion,
final String configHash)
throws IOException {
final Result<Record1<JSONB>> records = database.transaction(ctx -> ctx.select(ACTOR_CATALOG.CATALOG)
.from(ACTOR_CATALOG).join(ACTOR_CATALOG_FETCH_EVENT)
.on(ACTOR_CATALOG.ID.eq(ACTOR_CATALOG_FETCH_EVENT.ACTOR_CATALOG_ID))
.where(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID.eq(actorId))
.and(ACTOR_CATALOG_FETCH_EVENT.ACTOR_VERSION.eq(actorVersion))
.and(ACTOR_CATALOG_FETCH_EVENT.CONFIG_HASH.eq(configHash))).fetch();

if (records.size() > 1) {
throw new IllegalStateException(
String.format("Multiple catalog stored for ActorId=%s,ActorVersion=%s,ConfigHash=%s", actorId.toString(), actorVersion, configHash));
} else if (records.size() == 1) {
final JSONB record = records.get(0).get(ACTOR_CATALOG.CATALOG);
return Optional.of(Jsons.deserialize(record.toString(), AirbyteCatalog.class));
}
return Optional.empty();

}

public UUID writeActorCatalog(final AirbyteCatalog airbyteCatalog) throws IOException {
final OffsetDateTime timestamp = OffsetDateTime.now();
final HashFunction hashFunction = Hashing.murmur3_32_fixed();
final UUID catalogId = UUID.randomUUID();
final String catalogHash = hashFunction.hashBytes(Jsons.serialize(airbyteCatalog).getBytes(
Charsets.UTF_8)).toString();
database.transaction(ctx -> ctx.insertInto(ACTOR_CATALOG)
.set(ACTOR_CATALOG.ID, catalogId)
.set(ACTOR_CATALOG.CATALOG, JSONB.valueOf(Jsons.serialize(airbyteCatalog)))
.set(ACTOR_CATALOG.CATALOG_HASH, catalogHash)
.set(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT, timestamp)
.set(ACTOR_CATALOG_FETCH_EVENT.MODIFIED_AT, timestamp)).execute();
return catalogId;
}

public UUID writeActorCatalogFetchEvent(final AirbyteCatalog catalog,
final UUID sourceId,
final String connectorVersion,
final String configurationHash)
throws IOException {
final OffsetDateTime timestamp = OffsetDateTime.now();
final HashFunction hashFunction = Hashing.murmur3_32_fixed();
final String catalogHash = hashFunction.hashBytes(Jsons.serialize(catalog).getBytes(
Charsets.UTF_8)).toString();
ActorCatalog actorCatalog = new ActorCatalog()
.withCatalog(Jsons.jsonNode(catalog))
.withId(UUID.randomUUID())
.withCatalogHash(catalogHash);
final Optional<ActorCatalog> existingCatalog = findExistingCatalog(actorCatalog);
if (existingCatalog.isPresent()) {
actorCatalog = existingCatalog.get();
} else {
persistence.writeConfig(ConfigSchema.ACTOR_CATALOG,
actorCatalog.getId().toString(),
actorCatalog);
}
final ActorCatalogFetchEvent actorCatalogFetchEvent = new ActorCatalogFetchEvent()
.withActorCatalogId(actorCatalog.getId())
.withId(UUID.randomUUID())
.withConfigHash(configurationHash)
.withConnectorVersion(connectorVersion)
.withActorId(sourceId);
persistence.writeConfig(ConfigSchema.ACTOR_CATALOG_FETCH_EVENT,
actorCatalogFetchEvent.getId().toString(),
actorCatalogFetchEvent);
final Map<UUID, AirbyteCatalog> catalogs = findCatalogByHash(catalogHash);
UUID existingCatalogId = null;
for (final Map.Entry<UUID, AirbyteCatalog> entry : catalogs.entrySet()) {
if (entry.getValue().equals(catalog)) {
existingCatalogId = entry.getKey();
}
}

final UUID catalogId = existingCatalogId != null ? existingCatalogId : writeActorCatalog(catalog);
final UUID fetchEventID = UUID.randomUUID();
database.transaction(ctx -> ctx.insertInto(ACTOR_CATALOG_FETCH_EVENT)
.set(ACTOR_CATALOG_FETCH_EVENT.ID, fetchEventID)
.set(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID, sourceId)
.set(ACTOR_CATALOG_FETCH_EVENT.ACTOR_CATALOG_ID, catalogId)
.set(ACTOR_CATALOG_FETCH_EVENT.CONFIG_HASH, configurationHash)
.set(ACTOR_CATALOG_FETCH_EVENT.ACTOR_VERSION, connectorVersion)
.set(ACTOR_CATALOG_FETCH_EVENT.MODIFIED_AT, timestamp)
.set(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT, timestamp)).execute();

return fetchEventID;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.config.persistence;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.spy;

import io.airbyte.commons.json.Jsons;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSourceDefinition.SourceType;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.persistence.split_secrets.MemorySecretPersistence;
import io.airbyte.config.persistence.split_secrets.NoOpSecretsHydrator;
import io.airbyte.db.Database;
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator;
import io.airbyte.db.instance.development.DevDatabaseMigrator;
import io.airbyte.db.instance.development.MigrationDevHelper;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.Optional;
import java.util.UUID;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.PostgreSQLContainer;

public class ConfigRepositoryE2EReadWriteTest {

private final StandardWorkspace workspace = new StandardWorkspace()
.withWorkspaceId(UUID.randomUUID())
.withName("Default workspace")
.withSlug("default-workspace")
.withInitialSetupComplete(true);
private static PostgreSQLContainer<?> container;
private Database database;
private ConfigRepository configRepository;
private DatabaseConfigPersistence configPersistence;

@BeforeAll
public static void dbSetup() {
container = new PostgreSQLContainer<>("postgres:13-alpine")
.withDatabaseName("airbyte")
.withUsername("docker")
.withPassword("docker");
container.start();
}

@BeforeEach
void setup() throws IOException, JsonValidationException {
final var secretPersistence = new MemorySecretPersistence();
database = new ConfigsDatabaseInstance(container.getUsername(), container.getPassword(), container.getJdbcUrl()).getAndInitialize();
configPersistence = spy(new DatabaseConfigPersistence(database));
configRepository =
spy(new ConfigRepository(configPersistence, new NoOpSecretsHydrator(), Optional.of(secretPersistence), Optional.of(secretPersistence),
database));
final ConfigsDatabaseMigrator configsDatabaseMigrator =
new ConfigsDatabaseMigrator(database, DatabaseConfigPersistenceLoadDataTest.class.getName());
final DevDatabaseMigrator devDatabaseMigrator = new DevDatabaseMigrator(configsDatabaseMigrator);
MigrationDevHelper.runLastMigration(devDatabaseMigrator);
configRepository.writeStandardWorkspace(workspace);
}

@AfterAll
public static void dbDown() {
container.close();
}

@Test
void testSimpleInsertActorCatalog() throws IOException, JsonValidationException {

final StandardSourceDefinition sourceDefinition = new StandardSourceDefinition()
.withSourceDefinitionId(UUID.randomUUID())
.withSourceType(SourceType.DATABASE)
.withDockerRepository("docker-repo")
.withDockerImageTag("1.2.0")
.withName("sourceDefinition");
configRepository.writeStandardSourceDefinition(sourceDefinition);

final SourceConnection source = new SourceConnection()
.withSourceDefinitionId(sourceDefinition.getSourceDefinitionId())
.withSourceId(UUID.randomUUID())
.withName("SomeConnector")
.withWorkspaceId(workspace.getWorkspaceId())
.withConfiguration(Jsons.deserialize("{}"));
final ConnectorSpecification specification = new ConnectorSpecification()
.withConnectionSpecification(Jsons.deserialize("{}"));
configRepository.writeSourceConnection(source, specification);

final AirbyteCatalog actorCatalog = new AirbyteCatalog();
configRepository.writeActorCatalogFetchEvent(
actorCatalog, source.getSourceId(), "1.2.0", "ConfigHash");

final Optional<AirbyteCatalog> catalog =
configRepository.getActorCatalog(source.getSourceId(), "1.2.0", "ConfigHash");
assertTrue(catalog.isPresent());
assertEquals(actorCatalog, catalog.get());
assertFalse(configRepository.getSourceCatalog(source.getSourceId(), "1.3.0", "ConfigHash").isPresent());
assertFalse(configRepository.getSourceCatalog(source.getSourceId(), "1.2.0", "OtherConfigHash").isPresent());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.airbyte.config.State;
import io.airbyte.config.persistence.split_secrets.MemorySecretPersistence;
import io.airbyte.config.persistence.split_secrets.NoOpSecretsHydrator;
import io.airbyte.db.Database;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -47,13 +48,16 @@ class ConfigRepositoryTest {

private ConfigPersistence configPersistence;
private ConfigRepository configRepository;
private Database database;

@BeforeEach
void setup() {
configPersistence = mock(ConfigPersistence.class);
database = mock(Database.class);
final var secretPersistence = new MemorySecretPersistence();
configRepository =
spy(new ConfigRepository(configPersistence, new NoOpSecretsHydrator(), Optional.of(secretPersistence), Optional.of(secretPersistence)));
spy(new ConfigRepository(configPersistence, new NoOpSecretsHydrator(), Optional.of(secretPersistence), Optional.of(secretPersistence),
database));
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ public static void main(final String[] args) throws IOException, InterruptedExce
final Optional<SecretPersistence> secretPersistence = SecretPersistence.getLongLived(configs);
final Optional<SecretPersistence> ephemeralSecretPersistence = SecretPersistence.getEphemeral(configs);
final SecretsHydrator secretsHydrator = SecretPersistence.getSecretsHydrator(configs);
final ConfigRepository configRepository = new ConfigRepository(configPersistence, secretsHydrator, secretPersistence, ephemeralSecretPersistence);
final ConfigRepository configRepository = new ConfigRepository(configPersistence, secretsHydrator, secretPersistence, ephemeralSecretPersistence, configDatabase);

final JobPersistence jobPersistence = new DefaultJobPersistence(jobDatabase);
final JobCleaner jobCleaner = new JobCleaner(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con
final Optional<SecretPersistence> secretPersistence = SecretPersistence.getLongLived(configs);
final Optional<SecretPersistence> ephemeralSecretPersistence = SecretPersistence.getEphemeral(configs);
final ConfigRepository configRepository =
new ConfigRepository(configPersistence, secretsHydrator, secretPersistence, ephemeralSecretPersistence);
new ConfigRepository(configPersistence, secretsHydrator, secretPersistence, ephemeralSecretPersistence, configDatabase);

LOGGER.info("Creating jobs persistence...");
final Database jobDatabase = jobsDatabaseInstance.getInitialized();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void setup() throws Exception {
configPersistence = new DatabaseConfigPersistence(jobDatabase);
configPersistence.replaceAllConfigs(Collections.emptyMap(), false);
configPersistence.loadData(seedPersistence);
configRepository = new ConfigRepository(configPersistence, new NoOpSecretsHydrator(), Optional.empty(), Optional.empty());
configRepository = new ConfigRepository(configPersistence, new NoOpSecretsHydrator(), Optional.empty(), Optional.empty(), configDatabase);

jobPersistence.setVersion(VERSION.serialize());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ private static void launchWorkerApp() throws IOException {
final ConfigPersistence configPersistence = DatabaseConfigPersistence.createWithValidation(configDatabase);
final Optional<SecretPersistence> secretPersistence = SecretPersistence.getLongLived(configs);
final Optional<SecretPersistence> ephemeralSecretPersistence = SecretPersistence.getEphemeral(configs);
final ConfigRepository configRepository = new ConfigRepository(configPersistence, secretsHydrator, secretPersistence, ephemeralSecretPersistence);
final ConfigRepository configRepository = new ConfigRepository(configPersistence, secretsHydrator, secretPersistence, ephemeralSecretPersistence, configDatabase);

final Database jobDatabase = new JobsDatabaseInstance(
configs.getDatabaseUser(),
Expand Down

0 comments on commit 9a5b5df

Please sign in to comment.