Skip to content

Commit

Permalink
Add count connection functions (#10568)
Browse files Browse the repository at this point in the history
* Add count connection functions

* Fix new configRepository queries

- Remove unnecessary joins
- Fix countConnection

* Use existing mock data for tests
  • Loading branch information
malikdiarra authored and etsybaev committed Mar 5, 2022
1 parent ebe5cf4 commit 9e1b5a1
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

package io.airbyte.config.persistence;

import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR;
import static io.airbyte.db.instance.configs.jooq.Tables.CONNECTION;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Charsets;
import com.google.common.hash.HashFunction;
Expand Down Expand Up @@ -32,6 +35,7 @@
import io.airbyte.config.persistence.split_secrets.SplitSecretConfig;
import io.airbyte.db.Database;
import io.airbyte.db.ExceptionWrappingDatabase;
import io.airbyte.db.instance.configs.jooq.enums.ActorType;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.validation.json.JsonSchemaValidator;
Expand Down Expand Up @@ -654,6 +658,30 @@ public void writeCatalog(final AirbyteCatalog catalog,
actorCatalogFetchEvent);
}

public int countConnectionsForWorkspace(final UUID workspaceId) throws IOException {
return database.query(ctx -> ctx.selectCount()
.from(CONNECTION)
.join(ACTOR).on(CONNECTION.SOURCE_ID.eq(ACTOR.ID))
.where(ACTOR.WORKSPACE_ID.eq(workspaceId))
.andNot(ACTOR.TOMBSTONE)).fetchOne().into(int.class);
}

public int countSourcesForWorkspace(final UUID workspaceId) throws IOException {
return database.query(ctx -> ctx.selectCount()
.from(ACTOR)
.where(ACTOR.WORKSPACE_ID.equal(workspaceId))
.and(ACTOR.ACTOR_TYPE.eq(ActorType.source))
.andNot(ACTOR.TOMBSTONE)).fetchOne().into(int.class);
}

public int countDestinationsForWorkspace(final UUID workspaceId) throws IOException {
return database.query(ctx -> ctx.selectCount()
.from(ACTOR)
.where(ACTOR.WORKSPACE_ID.equal(workspaceId))
.and(ACTOR.ACTOR_TYPE.eq(ActorType.destination))
.andNot(ACTOR.TOMBSTONE)).fetchOne().into(int.class);
}

/**
* Converts between a dumpConfig() output and a replaceAllConfigs() input, by deserializing the
* string/jsonnode into the AirbyteConfig, Stream<Object<AirbyteConfig.getClassName()>>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.config.persistence;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.spy;

import io.airbyte.commons.json.Jsons;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.persistence.split_secrets.MemorySecretPersistence;
import io.airbyte.config.persistence.split_secrets.NoOpSecretsHydrator;
import io.airbyte.db.Database;
import io.airbyte.db.instance.configs.ConfigsDatabaseInstance;
import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator;
import io.airbyte.db.instance.development.DevDatabaseMigrator;
import io.airbyte.db.instance.development.MigrationDevHelper;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.Optional;
import java.util.UUID;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.PostgreSQLContainer;

public class ConfigRepositoryE2EReadWriteTest {

private static PostgreSQLContainer<?> container;
private Database database;
private ConfigRepository configRepository;
private DatabaseConfigPersistence configPersistence;

@BeforeAll
public static void dbSetup() {
container = new PostgreSQLContainer<>("postgres:13-alpine")
.withDatabaseName("airbyte")
.withUsername("docker")
.withPassword("docker");
container.start();
}

@BeforeEach
void setup() throws IOException, JsonValidationException {
final var secretPersistence = new MemorySecretPersistence();
database = new ConfigsDatabaseInstance(container.getUsername(), container.getPassword(), container.getJdbcUrl()).getAndInitialize();
configPersistence = spy(new DatabaseConfigPersistence(database));
configRepository =
spy(new ConfigRepository(configPersistence, new NoOpSecretsHydrator(), Optional.of(secretPersistence), Optional.of(secretPersistence),
database));
final ConfigsDatabaseMigrator configsDatabaseMigrator =
new ConfigsDatabaseMigrator(database, DatabaseConfigPersistenceLoadDataTest.class.getName());
final DevDatabaseMigrator devDatabaseMigrator = new DevDatabaseMigrator(configsDatabaseMigrator);
MigrationDevHelper.runLastMigration(devDatabaseMigrator);
configRepository.writeStandardWorkspace(MockData.standardWorkspace());
for (final StandardSourceDefinition sourceDefinition : MockData.standardSourceDefinitions()) {
configRepository.writeStandardSourceDefinition(sourceDefinition);
}
for (final StandardDestinationDefinition destinationDefinition : MockData.standardDestinationDefinitions()) {
configRepository.writeStandardDestinationDefinition(destinationDefinition);
}
final ConnectorSpecification specification = new ConnectorSpecification()
.withConnectionSpecification(Jsons.deserialize("{}"));
for (final SourceConnection connection : MockData.sourceConnections()) {
configRepository.writeSourceConnection(connection, specification);
}
for (final DestinationConnection connection : MockData.destinationConnections()) {
configRepository.writeDestinationConnection(connection, specification);
}
for (final StandardSyncOperation operation : MockData.standardSyncOperations()) {
configRepository.writeStandardSyncOperation(operation);
}
for (final StandardSync sync : MockData.standardSyncs()) {
configRepository.writeStandardSync(sync);
}
}

@AfterAll
public static void dbDown() {
container.close();
}

@Test
void testWorkspaceCountConnections() throws IOException {

final UUID workspaceId = MockData.standardWorkspace().getWorkspaceId();
assertEquals(MockData.standardSyncs().size(), configRepository.countConnectionsForWorkspace(workspaceId));
assertEquals(MockData.destinationConnections().size(), configRepository.countDestinationsForWorkspace(workspaceId));
assertEquals(MockData.sourceConnections().size(), configRepository.countSourcesForWorkspace(workspaceId));
}

}

0 comments on commit 9e1b5a1

Please sign in to comment.