From ec24ed33234fd17dccacf5e11d15d3055b6e36b3 Mon Sep 17 00:00:00 2001 From: Malik Diarra Date: Wed, 26 Oct 2022 15:48:00 -0700 Subject: [PATCH] Improve performance of the operation that retrieve all connector for a given definition (#18499) --- .../config/persistence/ConfigRepository.java | 32 +++++++++++++++++++ .../ConfigRepositoryE2EReadWriteTest.java | 16 ++++++++++ .../server/handlers/DestinationHandler.java | 13 ++------ .../server/handlers/SourceHandler.java | 9 +----- .../server/handlers/SourceHandlerTest.java | 2 +- 5 files changed, 53 insertions(+), 19 deletions(-) diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java index 7f9a7d011981..2e0f1c33861a 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java @@ -666,6 +666,38 @@ public List listWorkspaceDestinationConnection(UUID works return result.stream().map(DbConverter::buildDestinationConnection).collect(Collectors.toList()); } + /** + * Returns all active sources using a definition + * + * @param definitionId - id for the definition + * @return sources + * @throws IOException + */ + public List listSourcesForDefinition(UUID definitionId) throws IOException { + final Result result = database.query(ctx -> ctx.select(asterisk()) + .from(ACTOR) + .where(ACTOR.ACTOR_TYPE.eq(ActorType.source)) + .and(ACTOR.ACTOR_DEFINITION_ID.eq(definitionId)) + .andNot(ACTOR.TOMBSTONE).fetch()); + return result.stream().map(DbConverter::buildSourceConnection).collect(Collectors.toList()); + } + + /** + * Returns all active destinations using a definition + * + * @param definitionId - id for the definition + * @return destinations + * @throws IOException + */ + public List listDestinationsForDefinition(UUID definitionId) throws IOException { + final Result result = database.query(ctx -> ctx.select(asterisk()) + .from(ACTOR) + .where(ACTOR.ACTOR_TYPE.eq(ActorType.destination)) + .and(ACTOR.ACTOR_DEFINITION_ID.eq(definitionId)) + .andNot(ACTOR.TOMBSTONE).fetch()); + return result.stream().map(DbConverter::buildDestinationConnection).collect(Collectors.toList()); + } + public StandardSync getStandardSync(final UUID connectionId) throws JsonValidationException, IOException, ConfigNotFoundException { return persistence.getConfig(ConfigSchema.STANDARD_SYNC, connectionId.toString(), StandardSync.class); } diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java index e3f3cee96398..77c8b231a738 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java @@ -158,6 +158,22 @@ void testWorkspaceCountConnectionsDeprecated() throws IOException { assertEquals(1, configRepository.countConnectionsForWorkspace(workspaceId)); } + @Test + void testFetchActorsUsingDefinition() throws IOException { + UUID destinationDefinitionId = MockData.publicDestinationDefinition().getDestinationDefinitionId(); + UUID sourceDefinitionId = MockData.publicSourceDefinition().getSourceDefinitionId(); + final List destinationConnections = configRepository.listDestinationsForDefinition( + destinationDefinitionId); + final List sourceConnections = configRepository.listSourcesForDefinition( + sourceDefinitionId); + + assertThat(destinationConnections) + .containsExactlyElementsOf(MockData.destinationConnections().stream().filter(d -> d.getDestinationDefinitionId().equals( + destinationDefinitionId) && !d.getTombstone()).collect(Collectors.toList())); + assertThat(sourceConnections).containsExactlyElementsOf(MockData.sourceConnections().stream().filter(d -> d.getSourceDefinitionId().equals( + sourceDefinitionId) && !d.getTombstone()).collect(Collectors.toList())); + } + @Test void testSimpleInsertActorCatalog() throws IOException, JsonValidationException, SQLException { diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java index 4263e25e5df4..a828448c77a9 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationHandler.java @@ -196,12 +196,11 @@ public DestinationRead cloneDestination(final DestinationCloneRequestBody destin public DestinationReadList listDestinationsForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody) throws ConfigNotFoundException, IOException, JsonValidationException { - final List reads = Lists.newArrayList(); + final List reads = Lists.newArrayList(); for (final DestinationConnection dci : configRepository.listWorkspaceDestinationConnection(workspaceIdRequestBody.getWorkspaceId())) { reads.add(buildDestinationRead(dci)); } - return new DestinationReadList().destinations(reads); } @@ -209,14 +208,8 @@ public DestinationReadList listDestinationsForDestinationDefinition(final Destin throws JsonValidationException, IOException, ConfigNotFoundException { final List reads = Lists.newArrayList(); - for (final DestinationConnection destinationConnection : configRepository.listDestinationConnection()) { - if (!destinationConnection.getDestinationDefinitionId().equals(destinationDefinitionIdRequestBody.getDestinationDefinitionId())) { - continue; - } - if (destinationConnection.getTombstone() != null && destinationConnection.getTombstone()) { - continue; - } - + for (final DestinationConnection destinationConnection : configRepository + .listDestinationsForDefinition(destinationDefinitionIdRequestBody.getDestinationDefinitionId())) { reads.add(buildDestinationRead(destinationConnection)); } diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java index 767e3e839759..5751d271900c 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java @@ -17,7 +17,6 @@ import io.airbyte.api.model.generated.SourceSearch; import io.airbyte.api.model.generated.SourceUpdate; import io.airbyte.api.model.generated.WorkspaceIdRequestBody; -import io.airbyte.commons.lang.MoreBooleans; import io.airbyte.config.SourceConnection; import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.persistence.ConfigNotFoundException; @@ -176,14 +175,8 @@ public SourceReadList listSourcesForWorkspace(final WorkspaceIdRequestBody works public SourceReadList listSourcesForSourceDefinition(final SourceDefinitionIdRequestBody sourceDefinitionIdRequestBody) throws JsonValidationException, IOException, ConfigNotFoundException { - final List sourceConnections = configRepository.listSourceConnection() - .stream() - .filter(sc -> sc.getSourceDefinitionId().equals(sourceDefinitionIdRequestBody.getSourceDefinitionId()) - && !MoreBooleans.isTruthy(sc.getTombstone())) - .toList(); - final List reads = Lists.newArrayList(); - for (final SourceConnection sourceConnection : sourceConnections) { + for (final SourceConnection sourceConnection : configRepository.listSourcesForDefinition(sourceDefinitionIdRequestBody.getSourceDefinitionId())) { reads.add(buildSourceRead(sourceConnection)); } diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SourceHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SourceHandlerTest.java index 49b0651fbdb6..9da285912971 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SourceHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SourceHandlerTest.java @@ -283,7 +283,7 @@ void testListSourcesForSourceDefinition() throws JsonValidationException, Config new SourceDefinitionIdRequestBody().sourceDefinitionId(sourceConnection.getSourceDefinitionId()); when(configRepository.getSourceConnection(sourceConnection.getSourceId())).thenReturn(sourceConnection); - when(configRepository.listSourceConnection()).thenReturn(Lists.newArrayList(sourceConnection)); + when(configRepository.listSourcesForDefinition(sourceConnection.getSourceDefinitionId())).thenReturn(Lists.newArrayList(sourceConnection)); when(configRepository.getStandardSourceDefinition(sourceDefinitionSpecificationRead.getSourceDefinitionId())) .thenReturn(standardSourceDefinition); when(configRepository.getSourceDefinitionFromSource(sourceConnection.getSourceId())).thenReturn(standardSourceDefinition);