From ac9096a72b6446fd6534048824dd4d8f9c1df15b Mon Sep 17 00:00:00 2001 From: Malik Diarra Date: Fri, 21 Oct 2022 15:42:30 -0700 Subject: [PATCH 1/3] Add function to list actors that use a given definitionId --- .../config/persistence/ConfigRepository.java | 32 +++++++++++++++++++ .../ConfigRepositoryE2EReadWriteTest.java | 15 +++++++++ 2 files changed, 47 insertions(+) diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java index 7f9a7d011981d..2e0f1c33861af 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java @@ -666,6 +666,38 @@ public List listWorkspaceDestinationConnection(UUID works return result.stream().map(DbConverter::buildDestinationConnection).collect(Collectors.toList()); } + /** + * Returns all active sources using a definition + * + * @param definitionId - id for the definition + * @return sources + * @throws IOException + */ + public List listSourcesForDefinition(UUID definitionId) throws IOException { + final Result result = database.query(ctx -> ctx.select(asterisk()) + .from(ACTOR) + .where(ACTOR.ACTOR_TYPE.eq(ActorType.source)) + .and(ACTOR.ACTOR_DEFINITION_ID.eq(definitionId)) + .andNot(ACTOR.TOMBSTONE).fetch()); + return result.stream().map(DbConverter::buildSourceConnection).collect(Collectors.toList()); + } + + /** + * Returns all active destinations using a definition + * + * @param definitionId - id for the definition + * @return destinations + * @throws IOException + */ + public List listDestinationsForDefinition(UUID definitionId) throws IOException { + final Result result = database.query(ctx -> ctx.select(asterisk()) + .from(ACTOR) + .where(ACTOR.ACTOR_TYPE.eq(ActorType.destination)) + .and(ACTOR.ACTOR_DEFINITION_ID.eq(definitionId)) + .andNot(ACTOR.TOMBSTONE).fetch()); + return result.stream().map(DbConverter::buildDestinationConnection).collect(Collectors.toList()); + } + public StandardSync getStandardSync(final UUID connectionId) throws JsonValidationException, IOException, ConfigNotFoundException { return persistence.getConfig(ConfigSchema.STANDARD_SYNC, connectionId.toString(), StandardSync.class); } diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java index e3f3cee963987..5d520f74521cb 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java @@ -158,6 +158,21 @@ void testWorkspaceCountConnectionsDeprecated() throws IOException { assertEquals(1, configRepository.countConnectionsForWorkspace(workspaceId)); } + @Test + void testFetchActorsUsingDefinition() throws IOException { + UUID destinationDefinitionId = MockData.publicDestinationDefinition().getDestinationDefinitionId(); + UUID sourceDefinitionId = MockData.publicSourceDefinition().getSourceDefinitionId(); + final List destinationConnections = configRepository.listDestinationsForDefinition( + destinationDefinitionId); + final List sourceConnections = configRepository.listSourcesForDefinition( + sourceDefinitionId); + + assertThat(destinationConnections).containsExactlyElementsOf(MockData.destinationConnections().stream().filter(d -> d.getDestinationDefinitionId().equals( + destinationDefinitionId) && ! d.getTombstone()).collect(Collectors.toList())); + assertThat(sourceConnections).containsExactlyElementsOf(MockData.sourceConnections().stream().filter(d -> d.getSourceDefinitionId().equals( + sourceDefinitionId) && ! d.getTombstone()).collect(Collectors.toList())); + } + @Test void testSimpleInsertActorCatalog() throws IOException, JsonValidationException, SQLException { From 70f00c953c119c28ea26a853ae3feace3141b7c9 Mon Sep 17 00:00:00 2001 From: Malik Diarra Date: Tue, 25 Oct 2022 18:11:04 -0700 Subject: [PATCH 2/3] Use new db function in handlers --- .../airbyte/server/handlers/DestinationHandler.java | 12 ++---------- .../io/airbyte/server/handlers/SourceHandler.java | 8 +------- .../airbyte/server/handlers/SourceHandlerTest.java | 2 +- 3 files changed, 4 insertions(+), 18 deletions(-) diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java index 4263e25e5df40..c204d4c8849eb 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java @@ -196,12 +196,11 @@ public DestinationRead cloneDestination(final DestinationCloneRequestBody destin public DestinationReadList listDestinationsForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody) throws ConfigNotFoundException, IOException, JsonValidationException { - final List reads = Lists.newArrayList(); + final List reads = Lists.newArrayList(); for (final DestinationConnection dci : configRepository.listWorkspaceDestinationConnection(workspaceIdRequestBody.getWorkspaceId())) { reads.add(buildDestinationRead(dci)); } - return new DestinationReadList().destinations(reads); } @@ -209,14 +208,7 @@ public DestinationReadList listDestinationsForDestinationDefinition(final Destin throws JsonValidationException, IOException, ConfigNotFoundException { final List reads = Lists.newArrayList(); - for (final DestinationConnection destinationConnection : configRepository.listDestinationConnection()) { - if (!destinationConnection.getDestinationDefinitionId().equals(destinationDefinitionIdRequestBody.getDestinationDefinitionId())) { - continue; - } - if (destinationConnection.getTombstone() != null && destinationConnection.getTombstone()) { - continue; - } - + for (final DestinationConnection destinationConnection : configRepository.listDestinationsForDefinition(destinationDefinitionIdRequestBody.getDestinationDefinitionId())) { reads.add(buildDestinationRead(destinationConnection)); } diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java index 767e3e8397596..cfe5c1b02012b 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java @@ -176,14 +176,8 @@ public SourceReadList listSourcesForWorkspace(final WorkspaceIdRequestBody works public SourceReadList listSourcesForSourceDefinition(final SourceDefinitionIdRequestBody sourceDefinitionIdRequestBody) throws JsonValidationException, IOException, ConfigNotFoundException { - final List sourceConnections = configRepository.listSourceConnection() - .stream() - .filter(sc -> sc.getSourceDefinitionId().equals(sourceDefinitionIdRequestBody.getSourceDefinitionId()) - && !MoreBooleans.isTruthy(sc.getTombstone())) - .toList(); - final List reads = Lists.newArrayList(); - for (final SourceConnection sourceConnection : sourceConnections) { + for (final SourceConnection sourceConnection : configRepository.listSourcesForDefinition(sourceDefinitionIdRequestBody.getSourceDefinitionId())) { reads.add(buildSourceRead(sourceConnection)); } diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SourceHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SourceHandlerTest.java index 49b0651fbdb61..9da285912971e 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SourceHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SourceHandlerTest.java @@ -283,7 +283,7 @@ void testListSourcesForSourceDefinition() throws JsonValidationException, Config new SourceDefinitionIdRequestBody().sourceDefinitionId(sourceConnection.getSourceDefinitionId()); when(configRepository.getSourceConnection(sourceConnection.getSourceId())).thenReturn(sourceConnection); - when(configRepository.listSourceConnection()).thenReturn(Lists.newArrayList(sourceConnection)); + when(configRepository.listSourcesForDefinition(sourceConnection.getSourceDefinitionId())).thenReturn(Lists.newArrayList(sourceConnection)); when(configRepository.getStandardSourceDefinition(sourceDefinitionSpecificationRead.getSourceDefinitionId())) .thenReturn(standardSourceDefinition); when(configRepository.getSourceDefinitionFromSource(sourceConnection.getSourceId())).thenReturn(standardSourceDefinition); From 05e43f7c3fde03480b1341191f0e71d1fae612a4 Mon Sep 17 00:00:00 2001 From: Malik Diarra Date: Wed, 26 Oct 2022 10:39:17 -0700 Subject: [PATCH 3/3] Format --- .../persistence/ConfigRepositoryE2EReadWriteTest.java | 7 ++++--- .../io/airbyte/server/handlers/DestinationHandler.java | 3 ++- .../java/io/airbyte/server/handlers/SourceHandler.java | 1 - 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java index 5d520f74521cb..77c8b231a7385 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java @@ -167,10 +167,11 @@ void testFetchActorsUsingDefinition() throws IOException { final List sourceConnections = configRepository.listSourcesForDefinition( sourceDefinitionId); - assertThat(destinationConnections).containsExactlyElementsOf(MockData.destinationConnections().stream().filter(d -> d.getDestinationDefinitionId().equals( - destinationDefinitionId) && ! d.getTombstone()).collect(Collectors.toList())); + assertThat(destinationConnections) + .containsExactlyElementsOf(MockData.destinationConnections().stream().filter(d -> d.getDestinationDefinitionId().equals( + destinationDefinitionId) && !d.getTombstone()).collect(Collectors.toList())); assertThat(sourceConnections).containsExactlyElementsOf(MockData.sourceConnections().stream().filter(d -> d.getSourceDefinitionId().equals( - sourceDefinitionId) && ! d.getTombstone()).collect(Collectors.toList())); + sourceDefinitionId) && !d.getTombstone()).collect(Collectors.toList())); } @Test diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java index c204d4c8849eb..a828448c77a9b 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java @@ -208,7 +208,8 @@ public DestinationReadList listDestinationsForDestinationDefinition(final Destin throws JsonValidationException, IOException, ConfigNotFoundException { final List reads = Lists.newArrayList(); - for (final DestinationConnection destinationConnection : configRepository.listDestinationsForDefinition(destinationDefinitionIdRequestBody.getDestinationDefinitionId())) { + for (final DestinationConnection destinationConnection : configRepository + .listDestinationsForDefinition(destinationDefinitionIdRequestBody.getDestinationDefinitionId())) { reads.add(buildDestinationRead(destinationConnection)); } diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java index cfe5c1b02012b..5751d271900cb 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java @@ -17,7 +17,6 @@ import io.airbyte.api.model.generated.SourceSearch; import io.airbyte.api.model.generated.SourceUpdate; import io.airbyte.api.model.generated.WorkspaceIdRequestBody; -import io.airbyte.commons.lang.MoreBooleans; import io.airbyte.config.SourceConnection; import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.persistence.ConfigNotFoundException;