From 39a14b73067fda00acfef8808a9194c94b8036e4 Mon Sep 17 00:00:00 2001 From: Parker Mossman Date: Mon, 10 Oct 2022 09:55:31 -0700 Subject: [PATCH] Efficient queries for connection list (#17360) * query once for all needed models, instead of querying within connections loop * cleanup and fix failing tests * pmd fix * fix query and add test * return empty if input list is empty * undo aggressive autoformatting * don't query for connection operations in a loop, instead query once and group-by connectionID in memory * try handling operationIds in a single query instead of two * remove optional * fix operationIds query * very annoying, test was failing because operationIds can be listed in a different order. verify operationIds separately from rest of object * combined queries/functions instead of separate queries for actor and definition * remove leftover lines that aren't doing anything * format * add javadoc * format * use leftjoin so that connections that lack operations aren't left out * clean up comments and format --- .../config/persistence/ConfigRepository.java | 132 ++++++++++++++--- .../DatabaseConfigPersistence.java | 24 +-- .../config/persistence/DbConverter.java | 30 +++- .../ConfigRepositoryE2EReadWriteTest.java | 68 ++++++++- .../airbyte/config/persistence/MockData.java | 2 +- .../job/DefaultJobPersistence.java | 64 +++++++- .../persistence/job/JobPersistence.java | 4 + .../job/DefaultJobPersistenceTest.java | 140 ++++++++++++++++++ .../server/handlers/JobHistoryHandler.java | 12 ++ .../WebBackendConnectionsHandler.java | 74 +++++++-- .../WebBackendConnectionsHandlerTest.java | 25 +++- .../server/helpers/ConnectionHelpers.java | 27 +++- 12 files changed, 524 insertions(+), 78 deletions(-) diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java index 02fc597ec916..c44d04de025a 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java @@ -15,9 +15,12 @@ import static io.airbyte.db.instance.configs.jooq.generated.Tables.OPERATION; import static io.airbyte.db.instance.configs.jooq.generated.Tables.WORKSPACE; import static org.jooq.impl.DSL.asterisk; +import static org.jooq.impl.DSL.groupConcat; import static org.jooq.impl.DSL.noCondition; +import static org.jooq.impl.DSL.select; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Charsets; import com.google.common.collect.Sets; import com.google.common.hash.HashFunction; @@ -54,6 +57,8 @@ import java.io.IOException; import java.time.OffsetDateTime; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -82,6 +87,8 @@ public class ConfigRepository { private static final Logger LOGGER = LoggerFactory.getLogger(ConfigRepository.class); + private static final String OPERATION_IDS_AGG_FIELD = "operation_ids_agg"; + private static final String OPERATION_IDS_AGG_DELIMITER = ","; private final ConfigPersistence persistence; private final ExceptionWrappingDatabase database; @@ -613,43 +620,73 @@ public void writeStandardSync(final StandardSync standardSync) throws JsonValida persistence.writeConfig(ConfigSchema.STANDARD_SYNC, standardSync.getConnectionId().toString(), standardSync); } - public List listStandardSyncs() throws ConfigNotFoundException, IOException, JsonValidationException { + public List listStandardSyncs() throws IOException, JsonValidationException { return persistence.listConfigs(ConfigSchema.STANDARD_SYNC, StandardSync.class); } public List listStandardSyncsUsingOperation(final UUID operationId) throws IOException { - final Result result = database.query(ctx -> ctx.select(CONNECTION.asterisk()) + + final Result connectionAndOperationIdsResult = database.query(ctx -> ctx + // SELECT connection.* plus the connection's associated operationIds as a concatenated list + .select( + CONNECTION.asterisk(), + groupConcat(CONNECTION_OPERATION.OPERATION_ID).separator(OPERATION_IDS_AGG_DELIMITER).as(OPERATION_IDS_AGG_FIELD)) .from(CONNECTION) - .join(CONNECTION_OPERATION) - .on(CONNECTION_OPERATION.CONNECTION_ID.eq(CONNECTION.ID)) - .where(CONNECTION_OPERATION.OPERATION_ID.eq(operationId))).fetch(); - return getStandardSyncsFromResult(result); + + // inner join with all connection_operation rows that match the connection's id + .join(CONNECTION_OPERATION).on(CONNECTION_OPERATION.CONNECTION_ID.eq(CONNECTION.ID)) + + // only keep rows for connections that have an operationId that matches the input. + // needs to be a sub query because we want to keep all operationIds for matching connections + // in the main query + .where(CONNECTION.ID.in( + select(CONNECTION.ID).from(CONNECTION).join(CONNECTION_OPERATION).on(CONNECTION_OPERATION.CONNECTION_ID.eq(CONNECTION.ID)) + .where(CONNECTION_OPERATION.OPERATION_ID.eq(operationId)))) + + // group by connection.id so that the groupConcat above works + .groupBy(CONNECTION.ID)).fetch(); + + return getStandardSyncsFromResult(connectionAndOperationIdsResult); } public List listWorkspaceStandardSyncs(final UUID workspaceId, final boolean includeDeleted) throws IOException { - final Result result = database.query(ctx -> ctx.select(CONNECTION.asterisk()) + final Result connectionAndOperationIdsResult = database.query(ctx -> ctx + // SELECT connection.* plus the connection's associated operationIds as a concatenated list + .select( + CONNECTION.asterisk(), + groupConcat(CONNECTION_OPERATION.OPERATION_ID).separator(OPERATION_IDS_AGG_DELIMITER).as(OPERATION_IDS_AGG_FIELD)) .from(CONNECTION) + + // left join with all connection_operation rows that match the connection's id. + // left join includes connections that don't have any connection_operations + .leftJoin(CONNECTION_OPERATION).on(CONNECTION_OPERATION.CONNECTION_ID.eq(CONNECTION.ID)) + + // join with source actors so that we can filter by workspaceId .join(ACTOR).on(CONNECTION.SOURCE_ID.eq(ACTOR.ID)) .where(ACTOR.WORKSPACE_ID.eq(workspaceId) - .and(includeDeleted ? noCondition() : CONNECTION.STATUS.notEqual(StatusType.deprecated)))) - .fetch(); - return getStandardSyncsFromResult(result); + .and(includeDeleted ? noCondition() : CONNECTION.STATUS.notEqual(StatusType.deprecated))) + + // group by connection.id so that the groupConcat above works + .groupBy(CONNECTION.ID)).fetch(); + + return getStandardSyncsFromResult(connectionAndOperationIdsResult); } - private List getStandardSyncsFromResult(final Result result) throws IOException { + private List getStandardSyncsFromResult(final Result connectionAndOperationIdsResult) { final List standardSyncs = new ArrayList<>(); - for (final Record record : result) { - final UUID connectionId = record.get(CONNECTION.ID); - final Result connectionOperationRecords = database.query(ctx -> ctx.select(asterisk()) - .from(CONNECTION_OPERATION) - .where(CONNECTION_OPERATION.CONNECTION_ID.eq(connectionId)) - .fetch()); - final List connectionOperationIds = - connectionOperationRecords.stream().map(r -> r.get(CONNECTION_OPERATION.OPERATION_ID)).collect(Collectors.toList()); - standardSyncs.add(DbConverter.buildStandardSync(record, connectionOperationIds)); + for (final Record record : connectionAndOperationIdsResult) { + final String operationIdsFromRecord = record.get(OPERATION_IDS_AGG_FIELD, String.class); + + // can be null when connection has no connectionOperations + final List operationIds = operationIdsFromRecord == null + ? Collections.emptyList() + : Arrays.stream(operationIdsFromRecord.split(OPERATION_IDS_AGG_DELIMITER)).map(UUID::fromString).toList(); + + standardSyncs.add(DbConverter.buildStandardSync(record, operationIds)); } + return standardSyncs; } @@ -798,6 +835,61 @@ private Map findCatalogByHash(final String catalogHash, fi return result; } + // Data-carrier records to hold combined result of query for a Source or Destination and its + // corresponding Definition. This enables the API layer to + // process combined information about a Source/Destination/Definition pair without requiring two + // separate queries and in-memory join operation, + // because the config models are grouped immediately in the repository layer. + @VisibleForTesting + public record SourceAndDefinition(SourceConnection source, StandardSourceDefinition definition) { + + } + + @VisibleForTesting + public record DestinationAndDefinition(DestinationConnection destination, StandardDestinationDefinition definition) { + + } + + public List getSourceAndDefinitionsFromSourceIds(final List sourceIds) throws IOException { + final Result records = database.query(ctx -> ctx + .select(ACTOR.asterisk(), ACTOR_DEFINITION.asterisk()) + .from(ACTOR) + .join(ACTOR_DEFINITION) + .on(ACTOR.ACTOR_DEFINITION_ID.eq(ACTOR_DEFINITION.ID)) + .where(ACTOR.ACTOR_TYPE.eq(ActorType.source), ACTOR.ID.in(sourceIds)) + .fetch()); + + final List sourceAndDefinitions = new ArrayList<>(); + + for (final Record record : records) { + final SourceConnection source = DbConverter.buildSourceConnection(record); + final StandardSourceDefinition definition = DbConverter.buildStandardSourceDefinition(record); + sourceAndDefinitions.add(new SourceAndDefinition(source, definition)); + } + + return sourceAndDefinitions; + } + + public List getDestinationAndDefinitionsFromDestinationIds(final List destinationIds) throws IOException { + final Result records = database.query(ctx -> ctx + .select(ACTOR.asterisk(), ACTOR_DEFINITION.asterisk()) + .from(ACTOR) + .join(ACTOR_DEFINITION) + .on(ACTOR.ACTOR_DEFINITION_ID.eq(ACTOR_DEFINITION.ID)) + .where(ACTOR.ACTOR_TYPE.eq(ActorType.destination), ACTOR.ID.in(destinationIds)) + .fetch()); + + final List destinationAndDefinitions = new ArrayList<>(); + + for (final Record record : records) { + final DestinationConnection destination = DbConverter.buildDestinationConnection(record); + final StandardDestinationDefinition definition = DbConverter.buildStandardDestinationDefinition(record); + destinationAndDefinitions.add(new DestinationAndDefinition(destination, definition)); + } + + return destinationAndDefinitions; + } + public ActorCatalog getActorCatalogById(final UUID actorCatalogId) throws IOException, ConfigNotFoundException { final Result result = database.query(ctx -> ctx.select(ACTOR_CATALOG.asterisk()) diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java index d5289f4bd7f4..3332c32fe5b0 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DatabaseConfigPersistence.java @@ -456,7 +456,7 @@ private List> listSourceConnectionWithMetad final List> sourceConnections = new ArrayList<>(); for (final Record record : result) { - final SourceConnection sourceConnection = buildSourceConnection(record); + final SourceConnection sourceConnection = DbConverter.buildSourceConnection(record); sourceConnections.add(new ConfigWithMetadata<>( record.get(ACTOR.ID).toString(), ConfigSchema.SOURCE_CONNECTION.name(), @@ -467,16 +467,6 @@ private List> listSourceConnectionWithMetad return sourceConnections; } - private SourceConnection buildSourceConnection(final Record record) { - return new SourceConnection() - .withSourceId(record.get(ACTOR.ID)) - .withConfiguration(Jsons.deserialize(record.get(ACTOR.CONFIGURATION).data())) - .withWorkspaceId(record.get(ACTOR.WORKSPACE_ID)) - .withSourceDefinitionId(record.get(ACTOR.ACTOR_DEFINITION_ID)) - .withTombstone(record.get(ACTOR.TOMBSTONE)) - .withName(record.get(ACTOR.NAME)); - } - private List> listDestinationConnectionWithMetadata() throws IOException { return listDestinationConnectionWithMetadata(Optional.empty()); } @@ -492,7 +482,7 @@ private List> listDestinationConnectio final List> destinationConnections = new ArrayList<>(); for (final Record record : result) { - final DestinationConnection destinationConnection = buildDestinationConnection(record); + final DestinationConnection destinationConnection = DbConverter.buildDestinationConnection(record); destinationConnections.add(new ConfigWithMetadata<>( record.get(ACTOR.ID).toString(), ConfigSchema.DESTINATION_CONNECTION.name(), @@ -503,16 +493,6 @@ private List> listDestinationConnectio return destinationConnections; } - private DestinationConnection buildDestinationConnection(final Record record) { - return new DestinationConnection() - .withDestinationId(record.get(ACTOR.ID)) - .withConfiguration(Jsons.deserialize(record.get(ACTOR.CONFIGURATION).data())) - .withWorkspaceId(record.get(ACTOR.WORKSPACE_ID)) - .withDestinationDefinitionId(record.get(ACTOR.ACTOR_DEFINITION_ID)) - .withTombstone(record.get(ACTOR.TOMBSTONE)) - .withName(record.get(ACTOR.NAME)); - } - private List> listSourceOauthParamWithMetadata() throws IOException { return listSourceOauthParamWithMetadata(Optional.empty()); } diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java index 358cbb215e3d..f0179e982171 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/DbConverter.java @@ -4,6 +4,7 @@ package io.airbyte.config.persistence; +import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR; import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR_CATALOG; import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR_DEFINITION; import static io.airbyte.db.instance.configs.jooq.generated.Tables.ACTOR_OAUTH_PARAMETER; @@ -15,12 +16,14 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.config.ActorCatalog; import io.airbyte.config.ActorDefinitionResourceRequirements; +import io.airbyte.config.DestinationConnection; import io.airbyte.config.DestinationOAuthParameter; import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType; import io.airbyte.config.Notification; import io.airbyte.config.ResourceRequirements; import io.airbyte.config.Schedule; import io.airbyte.config.ScheduleData; +import io.airbyte.config.SourceConnection; import io.airbyte.config.SourceOAuthParameter; import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.StandardSourceDefinition; @@ -32,15 +35,18 @@ import io.airbyte.config.WorkspaceServiceAccount; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConnectorSpecification; -import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.UUID; import org.jooq.Record; +/** + * Provides static methods for converting from repository layer results (often in the form of a jooq + * {@link Record}) to config models. + */ public class DbConverter { - public static StandardSync buildStandardSync(final Record record, final List connectionOperationId) throws IOException { + public static StandardSync buildStandardSync(final Record record, final List connectionOperationId) { return new StandardSync() .withConnectionId(record.get(CONNECTION.ID)) .withNamespaceDefinition( @@ -89,6 +95,26 @@ public static StandardWorkspace buildStandardWorkspace(final Record record) { .withFeedbackDone(record.get(WORKSPACE.FEEDBACK_COMPLETE)); } + public static SourceConnection buildSourceConnection(final Record record) { + return new SourceConnection() + .withSourceId(record.get(ACTOR.ID)) + .withConfiguration(Jsons.deserialize(record.get(ACTOR.CONFIGURATION).data())) + .withWorkspaceId(record.get(ACTOR.WORKSPACE_ID)) + .withSourceDefinitionId(record.get(ACTOR.ACTOR_DEFINITION_ID)) + .withTombstone(record.get(ACTOR.TOMBSTONE)) + .withName(record.get(ACTOR.NAME)); + } + + public static DestinationConnection buildDestinationConnection(final Record record) { + return new DestinationConnection() + .withDestinationId(record.get(ACTOR.ID)) + .withConfiguration(Jsons.deserialize(record.get(ACTOR.CONFIGURATION).data())) + .withWorkspaceId(record.get(ACTOR.WORKSPACE_ID)) + .withDestinationDefinitionId(record.get(ACTOR.ACTOR_DEFINITION_ID)) + .withTombstone(record.get(ACTOR.TOMBSTONE)) + .withName(record.get(ACTOR.NAME)); + } + public static StandardSourceDefinition buildStandardSourceDefinition(final Record record) { return new StandardSourceDefinition() .withSourceDefinitionId(record.get(ACTOR_DEFINITION.ID)) diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java index 7d6043c01d14..c1fe7f1aebaf 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java @@ -26,6 +26,8 @@ import io.airbyte.config.StandardSync; import io.airbyte.config.StandardSyncOperation; import io.airbyte.config.StandardWorkspace; +import io.airbyte.config.persistence.ConfigRepository.DestinationAndDefinition; +import io.airbyte.config.persistence.ConfigRepository.SourceAndDefinition; import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor; import io.airbyte.db.Database; import io.airbyte.db.factory.DSLContextFactory; @@ -56,6 +58,7 @@ import org.jooq.DSLContext; import org.jooq.SQLDialect; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -85,7 +88,7 @@ public static void dbSetup() { } @BeforeEach - void setup() throws IOException, JsonValidationException, SQLException, DatabaseInitializationException { + void setup() throws IOException, JsonValidationException, SQLException, DatabaseInitializationException, InterruptedException { dataSource = DatabaseConnectionHelper.createDataSource(container); dslContext = DSLContextFactory.create(dataSource, SQLDialect.POSTGRES); flyway = FlywayFactory.create(dataSource, DatabaseConfigPersistenceLoadDataTest.class.getName(), ConfigsDatabaseMigrator.DB_IDENTIFIER, @@ -200,16 +203,18 @@ void testSimpleInsertActorCatalog() throws IOException, JsonValidationException, @Test void testListWorkspaceStandardSyncAll() throws IOException { + final List expectedSyncs = MockData.standardSyncs().subList(0, 4); + final List actualSyncs = configRepository.listWorkspaceStandardSyncs(MockData.standardWorkspaces().get(0).getWorkspaceId(), true); - final List syncs = configRepository.listWorkspaceStandardSyncs(MockData.standardWorkspaces().get(0).getWorkspaceId(), true); - assertThat(MockData.standardSyncs().subList(0, 4)).hasSameElementsAs(syncs); + assertSyncsMatch(expectedSyncs, actualSyncs); } @Test void testListWorkspaceStandardSyncExcludeDeleted() throws IOException { + final List expectedSyncs = MockData.standardSyncs().subList(0, 3); + final List actualSyncs = configRepository.listWorkspaceStandardSyncs(MockData.standardWorkspaces().get(0).getWorkspaceId(), false); - final List syncs = configRepository.listWorkspaceStandardSyncs(MockData.standardWorkspaces().get(0).getWorkspaceId(), false); - assertThat(MockData.standardSyncs().subList(0, 3)).hasSameElementsAs(syncs); + assertSyncsMatch(expectedSyncs, actualSyncs); } @Test @@ -412,12 +417,35 @@ void testMissingSourceOAuthByDefinitionId() throws IOException { @Test void testGetStandardSyncUsingOperation() throws IOException { final UUID operationId = MockData.standardSyncOperations().get(0).getOperationId(); - final List expectedSyncs = MockData.standardSyncs().subList(0, 4); + final List expectedSyncs = MockData.standardSyncs().subList(0, 3); + final List actualSyncs = configRepository.listStandardSyncsUsingOperation(operationId); - final List syncs = configRepository.listStandardSyncsUsingOperation(operationId); + assertSyncsMatch(expectedSyncs, actualSyncs); + } - assertThat(syncs).hasSameElementsAs(expectedSyncs); + private void assertSyncsMatch(final List expectedSyncs, final List actualSyncs) { + assertEquals(expectedSyncs.size(), actualSyncs.size()); + for (final StandardSync expected : expectedSyncs) { + + final Optional maybeActual = actualSyncs.stream().filter(s -> s.getConnectionId().equals(expected.getConnectionId())).findFirst(); + if (maybeActual.isEmpty()) { + Assertions.fail(String.format("Expected to find connectionId %s in result, but actual connectionIds are %s", + expected.getConnectionId(), + actualSyncs.stream().map(StandardSync::getConnectionId).collect(Collectors.toList()))); + } + final StandardSync actual = maybeActual.get(); + + // operationIds can be ordered differently in the query result than in the mock data, so they need + // to be verified separately + // from the rest of the sync. + assertThat(actual.getOperationIds()).hasSameElementsAs(expected.getOperationIds()); + + // now, clear operationIds so the rest of the sync can be compared + expected.setOperationIds(null); + actual.setOperationIds(null); + assertEquals(expected, actual); + } } @Test @@ -439,4 +467,28 @@ void testDeleteStandardSyncOperation() } } + @Test + void testGetSourceAndDefinitionsFromSourceIds() throws IOException { + final List sourceIds = MockData.sourceConnections().subList(0, 2).stream().map(SourceConnection::getSourceId).toList(); + + final List expected = List.of( + new SourceAndDefinition(MockData.sourceConnections().get(0), MockData.standardSourceDefinitions().get(0)), + new SourceAndDefinition(MockData.sourceConnections().get(1), MockData.standardSourceDefinitions().get(1))); + + final List actual = configRepository.getSourceAndDefinitionsFromSourceIds(sourceIds); + assertThat(actual).hasSameElementsAs(expected); + } + + @Test + void testGetDestinationAndDefinitionsFromDestinationIds() throws IOException { + final List destinationIds = MockData.destinationConnections().subList(0, 2).stream().map(DestinationConnection::getDestinationId).toList(); + + final List expected = List.of( + new DestinationAndDefinition(MockData.destinationConnections().get(0), MockData.standardDestinationDefinitions().get(0)), + new DestinationAndDefinition(MockData.destinationConnections().get(1), MockData.standardDestinationDefinitions().get(1))); + + final List actual = configRepository.getDestinationAndDefinitionsFromDestinationIds(destinationIds); + assertThat(actual).hasSameElementsAs(expected); + } + } diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/MockData.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/MockData.java index f191229674f6..83d014032f10 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/MockData.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/MockData.java @@ -481,7 +481,7 @@ public static List standardSyncs() { .withSchedule(schedule); final StandardSync standardSync4 = new StandardSync() - .withOperationIds(Arrays.asList(OPERATION_ID_1, OPERATION_ID_2)) + .withOperationIds(Collections.emptyList()) .withConnectionId(CONNECTION_ID_4) .withSourceId(SOURCE_ID_2) .withDestinationId(DESTINATION_ID_2) diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java index dfa93dd2efd1..32eaa4d6037e 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java @@ -55,6 +55,8 @@ import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -110,6 +112,13 @@ public class DefaultJobPersistence implements JobPersistence { private static final String AIRBYTE_METADATA_TABLE = "airbyte_metadata"; public static final String ORDER_BY_JOB_TIME_ATTEMPT_TIME = "ORDER BY jobs.created_at DESC, jobs.id DESC, attempts.created_at ASC, attempts.id ASC "; + public static final String ORDER_BY_JOB_CREATED_AT_DESC = "ORDER BY jobs.created_at DESC "; + public static final String LIMIT_1 = "LIMIT 1 "; + private static final String JOB_STATUS_IS_NON_TERMINAL = String.format("status IN (%s) ", + JobStatus.NON_TERMINAL_STATUSES.stream() + .map(Sqls::toSqlName) + .map(Names::singleQuote) + .collect(Collectors.joining(","))); private final ExceptionWrappingDatabase jobDatabase; private final Supplier timeSupplier; @@ -556,7 +565,7 @@ public Optional getLastReplicationJob(final UUID connectionId) throws IOExc "CAST(jobs.config_type AS VARCHAR) in " + Sqls.toSqlInFragment(Job.REPLICATION_TYPES) + AND + SCOPE_CLAUSE + "CAST(jobs.status AS VARCHAR) <> ? " + - "ORDER BY jobs.created_at DESC LIMIT 1", + ORDER_BY_JOB_CREATED_AT_DESC + LIMIT_1, connectionId.toString(), Sqls.toSqlName(JobStatus.CANCELLED)) .stream() @@ -570,7 +579,7 @@ public Optional getLastSyncJob(final UUID connectionId) throws IOException .fetch(BASE_JOB_SELECT_AND_JOIN + WHERE + "CAST(jobs.config_type AS VARCHAR) = ? " + AND + "scope = ? " + - "ORDER BY jobs.created_at DESC LIMIT 1", + ORDER_BY_JOB_CREATED_AT_DESC + LIMIT_1, Sqls.toSqlName(ConfigType.SYNC), connectionId.toString()) .stream() @@ -578,6 +587,57 @@ public Optional getLastSyncJob(final UUID connectionId) throws IOException .flatMap(r -> getJobOptional(ctx, r.get(JOB_ID, Long.class)))); } + /** + * For each connection ID in the input, find that connection's latest sync job and return it if one + * exists. + */ + @Override + public List getLastSyncJobForConnections(final List connectionIds) throws IOException { + if (connectionIds.isEmpty()) { + return Collections.emptyList(); + } + + return jobDatabase.query(ctx -> ctx + .fetch("SELECT DISTINCT ON (scope) * FROM jobs " + + WHERE + "CAST(jobs.config_type AS VARCHAR) = ? " + + AND + scopeInList(connectionIds) + + "ORDER BY scope, created_at DESC", + Sqls.toSqlName(ConfigType.SYNC)) + .stream() + .flatMap(r -> getJobOptional(ctx, r.get("id", Long.class)).stream()) + .collect(Collectors.toList())); + } + + /** + * For each connection ID in the input, find that connection's most recent non-terminal sync job and + * return it if one exists. + */ + @Override + public List getRunningSyncJobForConnections(final List connectionIds) throws IOException { + if (connectionIds.isEmpty()) { + return Collections.emptyList(); + } + + return jobDatabase.query(ctx -> ctx + .fetch("SELECT DISTINCT ON (scope) * FROM jobs " + + WHERE + "CAST(jobs.config_type AS VARCHAR) = ? " + + AND + scopeInList(connectionIds) + + AND + JOB_STATUS_IS_NON_TERMINAL + + "ORDER BY scope, created_at DESC", + Sqls.toSqlName(ConfigType.SYNC)) + .stream() + .flatMap(r -> getJobOptional(ctx, r.get("id", Long.class)).stream()) + .collect(Collectors.toList())); + } + + private String scopeInList(final Collection connectionIds) { + return String.format("scope IN (%s) ", + connectionIds.stream() + .map(UUID::toString) + .map(Names::singleQuote) + .collect(Collectors.joining(","))); + } + @Override public Optional getFirstReplicationJob(final UUID connectionId) throws IOException { return jobDatabase.query(ctx -> ctx diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java index 06554863eff0..5744dd695496 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java @@ -211,6 +211,10 @@ List listJobStatusAndTimestampWithConnection(UUID con Optional getLastSyncJob(UUID connectionId) throws IOException; + List getLastSyncJobForConnections(final List connectionIds) throws IOException; + + List getRunningSyncJobForConnections(final List connectionIds) throws IOException; + Optional getFirstReplicationJob(UUID connectionId) throws IOException; Optional getNextJob() throws IOException; diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java index 310ecbdbbb85..8629b307ddf4 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java @@ -978,6 +978,146 @@ void testGetLastSyncJobForConnectionIdEmptyBecauseOnlyReset() throws IOException } + @Nested + @DisplayName("When getting the last sync job for multiple connections") + class GetLastSyncJobForConnections { + + private static final UUID CONNECTION_ID_1 = UUID.randomUUID(); + private static final UUID CONNECTION_ID_2 = UUID.randomUUID(); + private static final UUID CONNECTION_ID_3 = UUID.randomUUID(); + private static final String SCOPE_1 = CONNECTION_ID_1.toString(); + private static final String SCOPE_2 = CONNECTION_ID_2.toString(); + private static final String SCOPE_3 = CONNECTION_ID_3.toString(); + private static final List CONNECTION_IDS = List.of(CONNECTION_ID_1, CONNECTION_ID_2, CONNECTION_ID_3); + + @Test + @DisplayName("Should return nothing if no sync job exists") + void testGetLastSyncJobsForConnectionsEmpty() throws IOException { + final List actual = jobPersistence.getLastSyncJobForConnections(CONNECTION_IDS); + + assertTrue(actual.isEmpty()); + } + + @Test + @DisplayName("Should return the last enqueued sync job for each connection") + void testGetLastSyncJobForConnections() throws IOException { + final long scope1Job1 = jobPersistence.enqueueJob(SCOPE_1, SYNC_JOB_CONFIG).orElseThrow(); + jobPersistence.succeedAttempt(scope1Job1, jobPersistence.createAttempt(scope1Job1, LOG_PATH)); + + final long scope2Job1 = jobPersistence.enqueueJob(SCOPE_2, SYNC_JOB_CONFIG).orElseThrow(); + jobPersistence.succeedAttempt(scope2Job1, jobPersistence.createAttempt(scope2Job1, LOG_PATH)); + + final long scope3Job1 = jobPersistence.enqueueJob(SCOPE_3, SYNC_JOB_CONFIG).orElseThrow(); + + final Instant afterNow = NOW.plusSeconds(1000); + when(timeSupplier.get()).thenReturn(afterNow); + + final long scope1Job2 = jobPersistence.enqueueJob(SCOPE_1, SYNC_JOB_CONFIG).orElseThrow(); + final int scope1Job2AttemptNumber = jobPersistence.createAttempt(scope1Job2, LOG_PATH); + + // should return the latest sync job even if failed + jobPersistence.failAttempt(scope1Job2, scope1Job2AttemptNumber); + final Attempt scope1Job2attempt = jobPersistence.getJob(scope1Job2).getAttempts().stream().findFirst().orElseThrow(); + jobPersistence.failJob(scope1Job2); + + // will leave this job running + final long scope2Job2 = jobPersistence.enqueueJob(SCOPE_2, SYNC_JOB_CONFIG).orElseThrow(); + jobPersistence.createAttempt(scope2Job2, LOG_PATH); + final Attempt scope2Job2attempt = jobPersistence.getJob(scope2Job2).getAttempts().stream().findFirst().orElseThrow(); + + final List actual = jobPersistence.getLastSyncJobForConnections(CONNECTION_IDS); + final List expected = new ArrayList<>(); + expected.add(createJob(scope1Job2, SYNC_JOB_CONFIG, JobStatus.FAILED, List.of(scope1Job2attempt), afterNow.getEpochSecond(), SCOPE_1)); + expected.add(createJob(scope2Job2, SYNC_JOB_CONFIG, JobStatus.RUNNING, List.of(scope2Job2attempt), afterNow.getEpochSecond(), SCOPE_2)); + expected.add(createJob(scope3Job1, SYNC_JOB_CONFIG, JobStatus.PENDING, Collections.emptyList(), NOW.getEpochSecond(), SCOPE_3)); + + assertTrue(expected.size() == actual.size() && expected.containsAll(actual) && actual.containsAll(expected)); + } + + @Test + @DisplayName("Should return nothing if only reset job exists") + void testGetLastSyncJobsForConnectionsEmptyBecauseOnlyReset() throws IOException { + final long jobId = jobPersistence.enqueueJob(SCOPE_1, RESET_JOB_CONFIG).orElseThrow(); + jobPersistence.succeedAttempt(jobId, jobPersistence.createAttempt(jobId, LOG_PATH)); + + final Instant afterNow = NOW.plusSeconds(1000); + when(timeSupplier.get()).thenReturn(afterNow); + + final List actual = jobPersistence.getLastSyncJobForConnections(CONNECTION_IDS); + + assertTrue(actual.isEmpty()); + } + + } + + @Nested + @DisplayName("When getting the last running sync job for multiple connections") + class GetRunningSyncJobForConnections { + + private static final UUID CONNECTION_ID_1 = UUID.randomUUID(); + private static final UUID CONNECTION_ID_2 = UUID.randomUUID(); + private static final UUID CONNECTION_ID_3 = UUID.randomUUID(); + private static final String SCOPE_1 = CONNECTION_ID_1.toString(); + private static final String SCOPE_2 = CONNECTION_ID_2.toString(); + private static final String SCOPE_3 = CONNECTION_ID_3.toString(); + private static final List CONNECTION_IDS = List.of(CONNECTION_ID_1, CONNECTION_ID_2, CONNECTION_ID_3); + + @Test + @DisplayName("Should return nothing if no sync job exists") + void testGetRunningSyncJobsForConnectionsEmpty() throws IOException { + final List actual = jobPersistence.getRunningSyncJobForConnections(CONNECTION_IDS); + + assertTrue(actual.isEmpty()); + } + + @Test + @DisplayName("Should return the last running sync job for each connection") + void testGetRunningSyncJobsForConnections() throws IOException { + // succeeded jobs should not be present in the result + final long scope1Job1 = jobPersistence.enqueueJob(SCOPE_1, SYNC_JOB_CONFIG).orElseThrow(); + jobPersistence.succeedAttempt(scope1Job1, jobPersistence.createAttempt(scope1Job1, LOG_PATH)); + + // fail scope2's first job, but later start a running job that should show up in the result + final long scope2Job1 = jobPersistence.enqueueJob(SCOPE_2, SYNC_JOB_CONFIG).orElseThrow(); + final int scope2Job1AttemptNumber = jobPersistence.createAttempt(scope2Job1, LOG_PATH); + jobPersistence.failAttempt(scope2Job1, scope2Job1AttemptNumber); + jobPersistence.failJob(scope2Job1); + + // pending jobs should be present in the result + final long scope3Job1 = jobPersistence.enqueueJob(SCOPE_3, SYNC_JOB_CONFIG).orElseThrow(); + + final Instant afterNow = NOW.plusSeconds(1000); + when(timeSupplier.get()).thenReturn(afterNow); + + // create a running job/attempt for scope2 + final long scope2Job2 = jobPersistence.enqueueJob(SCOPE_2, SYNC_JOB_CONFIG).orElseThrow(); + jobPersistence.createAttempt(scope2Job2, LOG_PATH); + final Attempt scope2Job2attempt = jobPersistence.getJob(scope2Job2).getAttempts().stream().findFirst().orElseThrow(); + + final List expected = new ArrayList<>(); + expected.add(createJob(scope2Job2, SYNC_JOB_CONFIG, JobStatus.RUNNING, List.of(scope2Job2attempt), afterNow.getEpochSecond(), SCOPE_2)); + expected.add(createJob(scope3Job1, SYNC_JOB_CONFIG, JobStatus.PENDING, Collections.emptyList(), NOW.getEpochSecond(), SCOPE_3)); + + final List actual = jobPersistence.getRunningSyncJobForConnections(CONNECTION_IDS); + assertTrue(expected.size() == actual.size() && expected.containsAll(actual) && actual.containsAll(expected)); + } + + @Test + @DisplayName("Should return nothing if only a running reset job exists") + void testGetRunningSyncJobsForConnectionsEmptyBecauseOnlyReset() throws IOException { + final long jobId = jobPersistence.enqueueJob(SCOPE_1, RESET_JOB_CONFIG).orElseThrow(); + jobPersistence.createAttempt(jobId, LOG_PATH); + + final Instant afterNow = NOW.plusSeconds(1000); + when(timeSupplier.get()).thenReturn(afterNow); + + final List actual = jobPersistence.getRunningSyncJobForConnections(CONNECTION_IDS); + + assertTrue(actual.isEmpty()); + } + + } + @Nested @DisplayName("When getting first replication job") class GetFirstReplicationJob { diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java index 70c56d6709e4..bb4f7bbb551f 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java @@ -140,6 +140,18 @@ public Optional getLatestSyncJob(final UUID connectionId) throws IOExce return jobPersistence.getLastSyncJob(connectionId).map(JobConverter::getJobRead); } + public List getLatestSyncJobsForConnections(final List connectionIds) throws IOException { + return jobPersistence.getLastSyncJobForConnections(connectionIds).stream() + .map(JobConverter::getJobRead) + .collect(Collectors.toList()); + } + + public List getRunningSyncJobForConnections(final List connectionIds) throws IOException { + return jobPersistence.getRunningSyncJobForConnections(connectionIds).stream() + .map(JobConverter::getJobRead) + .collect(Collectors.toList()); + } + private SourceRead getSourceRead(final ConnectionRead connectionRead) throws JsonValidationException, IOException, ConfigNotFoundException { final SourceIdRequestBody sourceIdRequestBody = new SourceIdRequestBody().sourceId(connectionRead.getSourceId()); return sourceHandler.getSource(sourceIdRequestBody); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java index 88f98c05389e..5a34ab913f1a 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java @@ -61,6 +61,8 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Collectors; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -95,19 +97,64 @@ public ConnectionStateType getStateType(final ConnectionIdRequestBody connection return Enums.convertTo(stateHandler.getState(connectionIdRequestBody).getStateType(), ConnectionStateType.class); } - public WebBackendConnectionReadList webBackendListConnectionsForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody) - throws ConfigNotFoundException, IOException, JsonValidationException { + public WebBackendConnectionReadList webBackendListConnectionsForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody) throws IOException { + + // passing 'false' so that deleted connections are not included + final List standardSyncs = + configRepository.listWorkspaceStandardSyncs(workspaceIdRequestBody.getWorkspaceId(), false); + final Map sourceReadById = + getSourceReadById(standardSyncs.stream().map(StandardSync::getSourceId).toList()); + final Map destinationReadById = + getDestinationReadById(standardSyncs.stream().map(StandardSync::getDestinationId).toList()); + final Map latestJobByConnectionId = + getLatestJobByConnectionId(standardSyncs.stream().map(StandardSync::getConnectionId).toList()); + final Map runningJobByConnectionId = + getRunningJobByConnectionId(standardSyncs.stream().map(StandardSync::getConnectionId).toList()); final List connectionItems = Lists.newArrayList(); - // passing 'false' so that deleted connections are not included - for (final StandardSync standardSync : configRepository.listWorkspaceStandardSyncs(workspaceIdRequestBody.getWorkspaceId(), false)) { - connectionItems.add(buildWebBackendConnectionListItem(standardSync)); + for (final StandardSync standardSync : standardSyncs) { + connectionItems.add( + buildWebBackendConnectionListItem( + standardSync, + sourceReadById, + destinationReadById, + latestJobByConnectionId, + runningJobByConnectionId)); } return new WebBackendConnectionReadList().connections(connectionItems); } + private Map getLatestJobByConnectionId(final List connectionIds) throws IOException { + return jobHistoryHandler.getLatestSyncJobsForConnections(connectionIds).stream() + .collect(Collectors.toMap(j -> UUID.fromString(j.getConfigId()), Function.identity())); + } + + private Map getRunningJobByConnectionId(final List connectionIds) throws IOException { + return jobHistoryHandler.getRunningSyncJobForConnections(connectionIds).stream() + .collect(Collectors.toMap(j -> UUID.fromString(j.getConfigId()), Function.identity())); + } + + private Map getSourceReadById(final List sourceIds) throws IOException { + final List sourceReads = configRepository.getSourceAndDefinitionsFromSourceIds(sourceIds) + .stream() + .map(sourceAndDefinition -> SourceHandler.toSourceRead(sourceAndDefinition.source(), sourceAndDefinition.definition())) + .toList(); + + return sourceReads.stream().collect(Collectors.toMap(SourceRead::getSourceId, Function.identity())); + } + + private Map getDestinationReadById(final List destinationIds) throws IOException { + final List destinationReads = configRepository.getDestinationAndDefinitionsFromDestinationIds(destinationIds) + .stream() + .map(destinationAndDefinition -> DestinationHandler.toDestinationRead(destinationAndDefinition.destination(), + destinationAndDefinition.definition())) + .toList(); + + return destinationReads.stream().collect(Collectors.toMap(DestinationRead::getDestinationId, Function.identity())); + } + private WebBackendConnectionRead buildWebBackendConnectionRead(final ConnectionRead connectionRead) throws ConfigNotFoundException, IOException, JsonValidationException { final SourceRead source = getSourceRead(connectionRead.getSourceId()); @@ -129,12 +176,17 @@ private WebBackendConnectionRead buildWebBackendConnectionRead(final ConnectionR return webBackendConnectionRead; } - private WebBackendConnectionListItem buildWebBackendConnectionListItem(final StandardSync standardSync) - throws JsonValidationException, ConfigNotFoundException, IOException { - final SourceRead source = getSourceRead(standardSync.getSourceId()); - final DestinationRead destination = getDestinationRead(standardSync.getDestinationId()); - final Optional latestSyncJob = jobHistoryHandler.getLatestSyncJob(standardSync.getConnectionId()); - final Optional latestRunningSyncJob = jobHistoryHandler.getLatestRunningSyncJob(standardSync.getConnectionId()); + private WebBackendConnectionListItem buildWebBackendConnectionListItem( + final StandardSync standardSync, + final Map sourceReadById, + final Map destinationReadById, + final Map latestJobByConnectionId, + final Map runningJobByConnectionId) { + + final SourceRead source = sourceReadById.get(standardSync.getSourceId()); + final DestinationRead destination = destinationReadById.get(standardSync.getDestinationId()); + final Optional latestSyncJob = Optional.ofNullable(latestJobByConnectionId.get(standardSync.getConnectionId())); + final Optional latestRunningSyncJob = Optional.ofNullable(runningJobByConnectionId.get(standardSync.getConnectionId())); final WebBackendConnectionListItem listItem = new WebBackendConnectionListItem() .connectionId(standardSync.getConnectionId()) diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java index f4c3d48652a6..49b4f2d09c47 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java @@ -72,6 +72,8 @@ import io.airbyte.config.StandardSync; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.config.persistence.ConfigRepository.DestinationAndDefinition; +import io.airbyte.config.persistence.ConfigRepository.SourceAndDefinition; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.JsonSchemaType; @@ -148,19 +150,24 @@ void setup() throws IOException, JsonValidationException, ConfigNotFoundExceptio eventRunner, configRepository); - final StandardSourceDefinition standardSourceDefinition = SourceDefinitionHelpers.generateSourceDefinition(); - standardSourceDefinition.setIcon(SOURCE_ICON); - final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); - sourceRead = SourceHelpers.getSourceRead(source, standardSourceDefinition); + final StandardSourceDefinition sourceDefinition = SourceDefinitionHelpers.generateSourceDefinition(); + sourceDefinition.setIcon(SOURCE_ICON); + final SourceConnection source = SourceHelpers.generateSource(sourceDefinition.getSourceDefinitionId()); + sourceRead = SourceHelpers.getSourceRead(source, sourceDefinition); final StandardDestinationDefinition destinationDefinition = DestinationDefinitionHelpers.generateDestination(); destinationDefinition.setIcon(DESTINATION_ICON); - final DestinationConnection destination = DestinationHelpers.generateDestination(UUID.randomUUID()); + final DestinationConnection destination = DestinationHelpers.generateDestination(destinationDefinition.getDestinationDefinitionId()); final DestinationRead destinationRead = DestinationHelpers.getDestinationRead(destination, destinationDefinition); - final StandardSync standardSync = ConnectionHelpers.generateSyncWithSourceId(source.getSourceId()); + final StandardSync standardSync = ConnectionHelpers.generateSyncWithSourceAndDestinationId(source.getSourceId(), destination.getDestinationId()); when(configRepository.listWorkspaceStandardSyncs(sourceRead.getWorkspaceId(), false)) .thenReturn(Collections.singletonList(standardSync)); + when(configRepository.getSourceAndDefinitionsFromSourceIds(Collections.singletonList(source.getSourceId()))) + .thenReturn(Collections.singletonList(new SourceAndDefinition(source, sourceDefinition))); + when(configRepository.getDestinationAndDefinitionsFromDestinationIds(Collections.singletonList(destination.getDestinationId()))) + .thenReturn(Collections.singletonList(new DestinationAndDefinition(destination, destinationDefinition))); + connectionRead = ConnectionHelpers.generateExpectedConnectionRead(standardSync); operationReadList = new OperationReadList() .operations(List.of(new OperationRead() @@ -195,6 +202,9 @@ void setup() throws IOException, JsonValidationException, ConfigNotFoundExceptio when(jobHistoryHandler.getLatestSyncJob(connectionRead.getConnectionId())).thenReturn(Optional.of(jobRead.getJob())); + when(jobHistoryHandler.getLatestSyncJobsForConnections(Collections.singletonList(connectionRead.getConnectionId()))) + .thenReturn(Collections.singletonList(jobRead.getJob())); + expectedListItem = ConnectionHelpers.generateExpectedWebBackendConnectionListItem( standardSync, sourceRead, @@ -300,8 +310,7 @@ void testGetWorkspaceStateEmpty() throws IOException { } @Test - void testWebBackendListConnectionsForWorkspace() throws ConfigNotFoundException, IOException, - JsonValidationException { + void testWebBackendListConnectionsForWorkspace() throws IOException { final WorkspaceIdRequestBody workspaceIdRequestBody = new WorkspaceIdRequestBody(); workspaceIdRequestBody.setWorkspaceId(sourceRead.getWorkspaceId()); diff --git a/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java b/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java index 042b537dc1ed..c6bac34a1e33 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java +++ b/airbyte-server/src/test/java/io/airbyte/server/helpers/ConnectionHelpers.java @@ -54,6 +54,8 @@ public class ConnectionHelpers { private static final String BASIC_SCHEDULE_DATA_TIME_UNITS = "days"; private static final long BASIC_SCHEDULE_DATA_UNITS = 1L; private static final String ONE_HUNDRED_G = "100g"; + private static final String STANDARD_SYNC_NAME = "presto to hudi"; + private static final String STANDARD_SYNC_PREFIX = "presto_to_hudi"; public static final StreamDescriptor STREAM_DESCRIPTOR = new StreamDescriptor().withName(STREAM_NAME); @@ -70,10 +72,10 @@ public static StandardSync generateSyncWithSourceId(final UUID sourceId) { return new StandardSync() .withConnectionId(connectionId) - .withName("presto to hudi") + .withName(STANDARD_SYNC_NAME) .withNamespaceDefinition(NamespaceDefinitionType.SOURCE) .withNamespaceFormat(null) - .withPrefix("presto_to_hudi") + .withPrefix(STANDARD_SYNC_PREFIX) .withStatus(StandardSync.Status.ACTIVE) .withCatalog(generateBasicConfiguredAirbyteCatalog()) .withSourceId(sourceId) @@ -89,10 +91,10 @@ public static StandardSync generateSyncWithDestinationId(final UUID destinationI return new StandardSync() .withConnectionId(connectionId) - .withName("presto to hudi") + .withName(STANDARD_SYNC_NAME) .withNamespaceDefinition(NamespaceDefinitionType.SOURCE) .withNamespaceFormat(null) - .withPrefix("presto_to_hudi") + .withPrefix(STANDARD_SYNC_PREFIX) .withStatus(StandardSync.Status.ACTIVE) .withCatalog(generateBasicConfiguredAirbyteCatalog()) .withSourceId(UUID.randomUUID()) @@ -101,6 +103,23 @@ public static StandardSync generateSyncWithDestinationId(final UUID destinationI .withManual(true); } + public static StandardSync generateSyncWithSourceAndDestinationId(final UUID sourceId, final UUID destinationId) { + final UUID connectionId = UUID.randomUUID(); + + return new StandardSync() + .withConnectionId(connectionId) + .withName(STANDARD_SYNC_NAME) + .withNamespaceDefinition(NamespaceDefinitionType.SOURCE) + .withNamespaceFormat(null) + .withPrefix(STANDARD_SYNC_PREFIX) + .withStatus(StandardSync.Status.ACTIVE) + .withCatalog(generateBasicConfiguredAirbyteCatalog()) + .withSourceId(sourceId) + .withDestinationId(destinationId) + .withOperationIds(List.of(UUID.randomUUID())) + .withManual(true); + } + public static ConnectionSchedule generateBasicConnectionSchedule() { return new ConnectionSchedule() .timeUnit(ConnectionSchedule.TimeUnitEnum.fromValue(BASIC_SCHEDULE_TIME_UNIT))