Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the use of ConfigPersistence for ActorCatalog operation #10387

Merged
merged 8 commits into from
Mar 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package io.airbyte.config.persistence;

import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR;
import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_CATALOG;
import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_CATALOG_FETCH_EVENT;
import static io.airbyte.db.instance.configs.jooq.Tables.CONNECTION;

import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -41,6 +43,7 @@
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
Expand All @@ -53,6 +56,12 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jooq.DSLContext;
import org.jooq.JSONB;
import org.jooq.Record;
import org.jooq.Record1;
import org.jooq.Record2;
import org.jooq.Result;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -627,35 +636,111 @@ public List<ActorCatalog> listActorCatalogs()
return actorCatalogs;
}

public void writeCatalog(final AirbyteCatalog catalog,
final UUID sourceId,
final String configurationHash,
final String connectorVersion)
throws JsonValidationException, IOException {
private Map<UUID, AirbyteCatalog> findCatalogByHash(final String catalogHash, final DSLContext context) {
final Result<Record2<UUID, JSONB>> records = context.select(ACTOR_CATALOG.ID, ACTOR_CATALOG.CATALOG)
.from(ACTOR_CATALOG)
.where(ACTOR_CATALOG.CATALOG_HASH.eq(catalogHash)).fetch();

final Map<UUID, AirbyteCatalog> result = new HashMap<>();
for (final Record record : records) {
final AirbyteCatalog catalog = Jsons.deserialize(
record.get(ACTOR_CATALOG.CATALOG).toString(), AirbyteCatalog.class);
result.put(record.get(ACTOR_CATALOG.ID), catalog);
}
return result;
}

/**
* Store an Airbyte catalog in DB if it is not present already
*
* Checks in the config DB if the catalog is present already, if so returns it identifier. It is not
* present, it is inserted in DB with a new identifier and that identifier is returned.
*
* @param airbyteCatalog An Airbyte catalog to cache
* @param context
* @return the db identifier for the cached catalog.
*/
private UUID getOrInsertActorCatalog(final AirbyteCatalog airbyteCatalog,
final DSLContext context) {
final OffsetDateTime timestamp = OffsetDateTime.now();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think we usually use Instant and Instant.now() for cases like this. Is there a particular reason why you chose OffsetDateTime here? It's not a big deal either way - both should work fine

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DatabaseConfigPersistence seems to rely only OffsetDateTime.now() only and does not use Instant at all. So I used the same object to be consistent with other database access patterns.

I just tried using Instant.now() and I'm getting the following error:

no suitable method found for set(TableField<ActorCatalogRecord,OffsetDateTime>,Instant)

We can try to normalize the usage of DateTime library but I think it should be handled as a separate ticket/set of PRs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, I did not realize that DatabaseConfigPersistence used this as well. No worries, then lets go with this

final HashFunction hashFunction = Hashing.murmur3_32_fixed();
final String catalogHash = hashFunction.hashBytes(Jsons.serialize(catalog).getBytes(
final String catalogHash = hashFunction.hashBytes(Jsons.serialize(airbyteCatalog).getBytes(
Charsets.UTF_8)).toString();
ActorCatalog actorCatalog = new ActorCatalog()
.withCatalog(Jsons.jsonNode(catalog))
.withId(UUID.randomUUID())
.withCatalogHash(catalogHash);
final Optional<ActorCatalog> existingCatalog = findExistingCatalog(actorCatalog);
if (existingCatalog.isPresent()) {
actorCatalog = existingCatalog.get();
} else {
persistence.writeConfig(ConfigSchema.ACTOR_CATALOG,
actorCatalog.getId().toString(),
actorCatalog);
}
final ActorCatalogFetchEvent actorCatalogFetchEvent = new ActorCatalogFetchEvent()
.withActorCatalogId(actorCatalog.getId())
.withId(UUID.randomUUID())
.withConfigHash(configurationHash)
.withConnectorVersion(connectorVersion)
.withActorId(sourceId);
persistence.writeConfig(ConfigSchema.ACTOR_CATALOG_FETCH_EVENT,
actorCatalogFetchEvent.getId().toString(),
actorCatalogFetchEvent);
final Map<UUID, AirbyteCatalog> catalogs = findCatalogByHash(catalogHash, context);

for (final Map.Entry<UUID, AirbyteCatalog> entry : catalogs.entrySet()) {
if (entry.getValue().equals(airbyteCatalog)) {
return entry.getKey();
}
Comment on lines +672 to +674
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this case is covered by the unit test, might be a good idea to add a test case for this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expanded on the existing test to add multiple insertion of the same catalog with different version/ config hash.

I decided to check on the number of entries in the DB after the function calls as spying on a private function (getOrInsertActorCatalog) is not possible. What do you think?

}

final UUID catalogId = UUID.randomUUID();
malikdiarra marked this conversation as resolved.
Show resolved Hide resolved
context.insertInto(ACTOR_CATALOG)
.set(ACTOR_CATALOG.ID, catalogId)
.set(ACTOR_CATALOG.CATALOG, JSONB.valueOf(Jsons.serialize(airbyteCatalog)))
.set(ACTOR_CATALOG.CATALOG_HASH, catalogHash)
.set(ACTOR_CATALOG.CREATED_AT, timestamp)
.set(ACTOR_CATALOG.MODIFIED_AT, timestamp).execute();
return catalogId;
}

public Optional<AirbyteCatalog> getActorCatalog(final UUID actorId,
malikdiarra marked this conversation as resolved.
Show resolved Hide resolved
final String actorVersion,
final String configHash)
throws IOException {
final Result<Record1<JSONB>> records = database.transaction(ctx -> ctx.select(ACTOR_CATALOG.CATALOG)
.from(ACTOR_CATALOG).join(ACTOR_CATALOG_FETCH_EVENT)
.on(ACTOR_CATALOG.ID.eq(ACTOR_CATALOG_FETCH_EVENT.ACTOR_CATALOG_ID))
.where(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID.eq(actorId))
.and(ACTOR_CATALOG_FETCH_EVENT.ACTOR_VERSION.eq(actorVersion))
.and(ACTOR_CATALOG_FETCH_EVENT.CONFIG_HASH.eq(configHash))
.orderBy(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT.desc()).limit(1)).fetch();

if (records.size() >= 1) {
final JSONB record = records.get(0).get(ACTOR_CATALOG.CATALOG);
return Optional.of(Jsons.deserialize(record.toString(), AirbyteCatalog.class));
}
return Optional.empty();

}

/**
* Stores source catalog information.
*
* This function is called each time the schema of a source is fetched. This can occur because the
* source is set up for the first time, because the configuration or version of the connector
* changed or because the user explicitly requested a schema refresh. Schemas are stored separately
* and de-duplicated upon insertion. Once a schema has been successfully stored, a call to
* getActorCatalog(actorId, connectionVersion, configurationHash) will return the most recent schema
* stored for those parameters.
*
* @param catalog
* @param actorId
* @param connectorVersion
* @param configurationHash
* @return The identifier (UUID) of the fetch event inserted in the database
* @throws IOException
*/
public UUID writeActorCatalogFetchEvent(final AirbyteCatalog catalog,
cgardens marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

@lmossman lmossman Mar 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be helpful if these methods had javadocs describing them. The thing that would be useful to explain is how the ActorCatalogFetchEvent and the ActorCatalog tables relate to one another / what a fetch event means and when it is written, etc, since it wasn't really obvious to me just from reading the code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

final UUID actorId,
final String connectorVersion,
final String configurationHash)
throws IOException {
final OffsetDateTime timestamp = OffsetDateTime.now();
final UUID fetchEventID = UUID.randomUUID();
database.transaction(ctx -> {
final UUID catalogId = getOrInsertActorCatalog(catalog, ctx);
return ctx.insertInto(ACTOR_CATALOG_FETCH_EVENT)
.set(ACTOR_CATALOG_FETCH_EVENT.ID, fetchEventID)
.set(ACTOR_CATALOG_FETCH_EVENT.ACTOR_ID, actorId)
.set(ACTOR_CATALOG_FETCH_EVENT.ACTOR_CATALOG_ID, catalogId)
.set(ACTOR_CATALOG_FETCH_EVENT.CONFIG_HASH, configurationHash)
.set(ACTOR_CATALOG_FETCH_EVENT.ACTOR_VERSION, connectorVersion)
.set(ACTOR_CATALOG_FETCH_EVENT.MODIFIED_AT, timestamp)
.set(ACTOR_CATALOG_FETCH_EVENT.CREATED_AT, timestamp).execute();
});

return fetchEventID;
}

public int countConnectionsForWorkspace(final UUID workspaceId) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.config.persistence;

import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_CATALOG;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.spy;

Expand All @@ -12,18 +13,25 @@
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSourceDefinition.SourceType;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.persistence.split_secrets.MemorySecretPersistence;
import io.airbyte.config.persistence.split_secrets.NoOpSecretsHydrator;
import io.airbyte.db.Database;
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator;
import io.airbyte.db.instance.development.DevDatabaseMigrator;
import io.airbyte.db.instance.development.MigrationDevHelper;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Optional;
import java.util.UUID;
import org.junit.jupiter.api.AfterAll;
Expand Down Expand Up @@ -97,4 +105,54 @@ void testWorkspaceCountConnections() throws IOException {
assertEquals(MockData.sourceConnections().size(), configRepository.countSourcesForWorkspace(workspaceId));
}

@Test
void testSimpleInsertActorCatalog() throws IOException, JsonValidationException, SQLException {

final StandardWorkspace workspace = MockData.standardWorkspace();

final StandardSourceDefinition sourceDefinition = new StandardSourceDefinition()
.withSourceDefinitionId(UUID.randomUUID())
.withSourceType(SourceType.DATABASE)
.withDockerRepository("docker-repo")
.withDockerImageTag("1.2.0")
.withName("sourceDefinition");
configRepository.writeStandardSourceDefinition(sourceDefinition);

final SourceConnection source = new SourceConnection()
.withSourceDefinitionId(sourceDefinition.getSourceDefinitionId())
.withSourceId(UUID.randomUUID())
.withName("SomeConnector")
.withWorkspaceId(workspace.getWorkspaceId())
.withConfiguration(Jsons.deserialize("{}"));
final ConnectorSpecification specification = new ConnectorSpecification()
.withConnectionSpecification(Jsons.deserialize("{}"));
configRepository.writeSourceConnection(source, specification);

final AirbyteCatalog actorCatalog = CatalogHelpers.createAirbyteCatalog("clothes", Field.of("name", JsonSchemaType.STRING));
configRepository.writeActorCatalogFetchEvent(
actorCatalog, source.getSourceId(), "1.2.0", "ConfigHash");

final Optional<AirbyteCatalog> catalog =
configRepository.getActorCatalog(source.getSourceId(), "1.2.0", "ConfigHash");
assertTrue(catalog.isPresent());
assertEquals(actorCatalog, catalog.get());
assertFalse(configRepository.getSourceCatalog(source.getSourceId(), "1.3.0", "ConfigHash").isPresent());
assertFalse(configRepository.getSourceCatalog(source.getSourceId(), "1.2.0", "OtherConfigHash").isPresent());

configRepository.writeActorCatalogFetchEvent(actorCatalog, source.getSourceId(), "1.3.0", "ConfigHash");
final Optional<AirbyteCatalog> catalogNewConnectorVersion =
configRepository.getActorCatalog(source.getSourceId(), "1.3.0", "ConfigHash");
assertTrue(catalogNewConnectorVersion.isPresent());
assertEquals(actorCatalog, catalogNewConnectorVersion.get());

configRepository.writeActorCatalogFetchEvent(actorCatalog, source.getSourceId(), "1.2.0", "OtherConfigHash");
final Optional<AirbyteCatalog> catalogNewConfig =
configRepository.getActorCatalog(source.getSourceId(), "1.2.0", "OtherConfigHash");
assertTrue(catalogNewConfig.isPresent());
assertEquals(actorCatalog, catalogNewConfig.get());

final int catalogDbEntry = database.query(ctx -> ctx.selectCount().from(ACTOR_CATALOG)).fetchOne().into(int.class);
assertEquals(1, catalogDbEntry);
}

}