From 79f43f5fb33bdcd0a5b5018fcdfd1ba86fc70150 Mon Sep 17 00:00:00 2001 From: jrhizor Date: Tue, 20 Jul 2021 11:00:02 -0700 Subject: [PATCH 1/3] add workspace helper --- .../server/helpers/WorkspaceHelper.java | 123 +++++++++++ .../server/helpers/WorkspaceHelperTest.java | 197 ++++++++++++++++++ 2 files changed, 320 insertions(+) create mode 100644 airbyte-server/src/main/java/io/airbyte/server/helpers/WorkspaceHelper.java create mode 100644 airbyte-server/src/test/java/io/airbyte/server/helpers/WorkspaceHelperTest.java diff --git a/airbyte-server/src/main/java/io/airbyte/server/helpers/WorkspaceHelper.java b/airbyte-server/src/main/java/io/airbyte/server/helpers/WorkspaceHelper.java new file mode 100644 index 0000000000000..7923427d89ef0 --- /dev/null +++ b/airbyte-server/src/main/java/io/airbyte/server/helpers/WorkspaceHelper.java @@ -0,0 +1,123 @@ +package io.airbyte.server.helpers; + +import com.fasterxml.jackson.databind.jsonschema.JsonSchema; +import com.google.common.base.Preconditions; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import io.airbyte.api.model.ConnectionIdRequestBody; +import io.airbyte.api.model.ConnectionRead; +import io.airbyte.api.model.DestinationIdRequestBody; +import io.airbyte.api.model.DestinationRead; +import io.airbyte.api.model.SourceIdRequestBody; +import io.airbyte.api.model.SourceRead; +import io.airbyte.config.JobConfig; +import io.airbyte.config.persistence.ConfigNotFoundException; +import io.airbyte.config.persistence.ConfigPersistence; +import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.scheduler.models.Job; +import io.airbyte.scheduler.persistence.JobPersistence; +import io.airbyte.server.converters.SpecFetcher; +import io.airbyte.server.handlers.ConnectionsHandler; +import io.airbyte.server.handlers.DestinationHandler; +import io.airbyte.server.handlers.SourceHandler; +import io.airbyte.validation.json.JsonSchemaValidator; +import io.airbyte.validation.json.JsonValidationException; +import org.apache.commons.lang3.NotImplementedException; + +import java.io.IOException; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +public class WorkspaceHelper { + + private final ConnectionsHandler connectionsHandler; + private final SourceHandler sourceHandler; + private final DestinationHandler destinationHandler; + + private final LoadingCache sourceToWorkspaceCache; + private final LoadingCache destinationToWorkspaceCache; + private final LoadingCache connectionToWorkspaceCache; + private final LoadingCache jobToWorkspaceCache; + + public WorkspaceHelper(ConfigRepository configRepository, JobPersistence jobPersistence, JsonSchemaValidator jsonSchemaValidator, SpecFetcher specFetcher) { + this.connectionsHandler = new ConnectionsHandler(configRepository); + this.sourceHandler = new SourceHandler(configRepository, jsonSchemaValidator, specFetcher, connectionsHandler); + this.destinationHandler = new DestinationHandler(configRepository, jsonSchemaValidator, specFetcher, connectionsHandler); + + this.sourceToWorkspaceCache = getExpiringCache(new CacheLoader<>() { + @Override + public UUID load(UUID sourceId) throws JsonValidationException, ConfigNotFoundException, IOException { + final SourceRead source = sourceHandler.getSource(new SourceIdRequestBody().sourceId(sourceId)); + return source.getWorkspaceId(); + } + }); + + this.destinationToWorkspaceCache = getExpiringCache(new CacheLoader<>() { + @Override + public UUID load(UUID destinationId) throws JsonValidationException, ConfigNotFoundException, IOException { + final DestinationRead destination = destinationHandler.getDestination(new DestinationIdRequestBody().destinationId(destinationId)); + return destination.getWorkspaceId(); + } + }); + + this.connectionToWorkspaceCache = getExpiringCache(new CacheLoader<>() { + @Override + public UUID load(UUID connectionId) throws JsonValidationException, ConfigNotFoundException, IOException, ExecutionException { + final ConnectionRead connection = connectionsHandler.getConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + final UUID sourceId = connection.getSourceId(); + final UUID destinationId = connection.getDestinationId(); + return getWorkspaceForConnection(sourceId, destinationId); + } + }); + + this.jobToWorkspaceCache = getExpiringCache(new CacheLoader<>() { + @Override + public UUID load(Long jobId) throws IOException, ExecutionException { + final Job job = jobPersistence.getJob(jobId); + if(job.getConfigType() == JobConfig.ConfigType.SYNC || job.getConfigType() == JobConfig.ConfigType.RESET_CONNECTION) { + return getWorkspaceForConnectionId(UUID.fromString(job.getScope())); + } else { + throw new IllegalArgumentException("Only sync/reset jobs are associated with workspaces! A " + job.getConfigType() + " job was requested!"); + } + } + }); + } + + public UUID getWorkspaceForSourceId(UUID sourceId) throws ExecutionException { + return sourceToWorkspaceCache.get(sourceId); + } + + public UUID getWorkspaceForDestinationId(UUID destinationId) throws ExecutionException { + return destinationToWorkspaceCache.get(destinationId); + } + + public UUID getWorkspaceForJobId(Long jobId) throws IOException, ExecutionException { + return jobToWorkspaceCache.get(jobId); + } + + public UUID getWorkspaceForConnection(UUID sourceId, UUID destinationId) throws ExecutionException { + final UUID sourceWorkspace = getWorkspaceForSourceId(sourceId); + final UUID destinationWorkspace = getWorkspaceForDestinationId(destinationId); + + Preconditions.checkArgument(Objects.equals(sourceWorkspace, destinationWorkspace), "Source and destination must be from the same workspace!"); + return sourceWorkspace; + } + + public UUID getWorkspaceForConnectionId(UUID connectionId) throws ExecutionException { + return connectionToWorkspaceCache.get(connectionId); + } + + public UUID getWorkspaceForOperationId(UUID operationId) { + throw new NotImplementedException(); + } + + private static LoadingCache getExpiringCache(CacheLoader cacheLoader) { + return CacheBuilder.newBuilder() + .expireAfterAccess(5, TimeUnit.MINUTES) + .build(cacheLoader); + } + +} \ No newline at end of file diff --git a/airbyte-server/src/test/java/io/airbyte/server/helpers/WorkspaceHelperTest.java b/airbyte-server/src/test/java/io/airbyte/server/helpers/WorkspaceHelperTest.java new file mode 100644 index 0000000000000..2e47e165ccde1 --- /dev/null +++ b/airbyte-server/src/test/java/io/airbyte/server/helpers/WorkspaceHelperTest.java @@ -0,0 +1,197 @@ +package io.airbyte.server.helpers; + +import com.google.common.util.concurrent.UncheckedExecutionException; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.lang.Exceptions; +import io.airbyte.config.DestinationConnection; +import io.airbyte.config.JobConfig; +import io.airbyte.config.JobSyncConfig; +import io.airbyte.config.SourceConnection; +import io.airbyte.config.StandardDestinationDefinition; +import io.airbyte.config.StandardSourceDefinition; +import io.airbyte.config.StandardSync; +import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.config.persistence.FileSystemConfigPersistence; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.scheduler.models.Job; +import io.airbyte.scheduler.models.JobStatus; +import io.airbyte.scheduler.persistence.JobPersistence; +import io.airbyte.server.converters.SpecFetcher; +import io.airbyte.validation.json.JsonSchemaValidator; +import io.airbyte.validation.json.JsonValidationException; +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.UUID; +import java.util.concurrent.ExecutionException; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class WorkspaceHelperTest { + + Path tmpDir; + ConfigRepository configRepository; + JobPersistence jobPersistence; + WorkspaceHelper workspaceHelper; + + @BeforeEach + public void setup() throws IOException { + tmpDir = Files.createTempDirectory("workspace_helper_test_" + RandomStringUtils.randomAlphabetic(5)); + + configRepository = new ConfigRepository(new FileSystemConfigPersistence(tmpDir)); + jobPersistence = mock(JobPersistence.class); + + SpecFetcher specFetcher = mock(SpecFetcher.class); + when(specFetcher.execute(any())).thenReturn(new ConnectorSpecification().withConnectionSpecification(Jsons.deserialize("{}"))); + + workspaceHelper = new WorkspaceHelper(configRepository, jobPersistence, new JsonSchemaValidator(), specFetcher); + } + + @Test + public void testObjectsThatDoNotExist() { + assertThrows(ExecutionException.class, () -> workspaceHelper.getWorkspaceForSourceId(UUID.randomUUID())); + assertThrows(ExecutionException.class, () -> workspaceHelper.getWorkspaceForDestinationId(UUID.randomUUID())); + assertThrows(ExecutionException.class, () -> workspaceHelper.getWorkspaceForConnectionId(UUID.randomUUID())); + assertThrows(ExecutionException.class, () -> workspaceHelper.getWorkspaceForConnection(UUID.randomUUID(), UUID.randomUUID())); + assertThrows(UncheckedExecutionException.class, () -> workspaceHelper.getWorkspaceForJobId(0L)); + // todo: add operationId check + } + + @Test + public void testSource() throws IOException, ExecutionException, JsonValidationException { + UUID source = UUID.randomUUID(); + UUID workspace = UUID.randomUUID(); + + UUID sourceDefinition = UUID.randomUUID(); + configRepository.writeStandardSource(new StandardSourceDefinition().withSourceDefinitionId(sourceDefinition)); + + SourceConnection sourceConnection = new SourceConnection() + .withSourceId(source) + .withSourceDefinitionId(sourceDefinition) + .withWorkspaceId(workspace) + .withConfiguration(Jsons.deserialize("{}")) + .withName("source") + .withTombstone(false); + + configRepository.writeSourceConnection(sourceConnection); + UUID retrievedWorkspace = workspaceHelper.getWorkspaceForSourceId(source); + + assertEquals(workspace, retrievedWorkspace); + + // check that caching is working + configRepository.writeSourceConnection(sourceConnection.withWorkspaceId(UUID.randomUUID())); + UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForSourceId(source); + assertEquals(workspace, retrievedWorkspaceAfterUpdate); + } + + @Test + public void testDestination() throws IOException, ExecutionException, JsonValidationException { + UUID destination = UUID.randomUUID(); + UUID workspace = UUID.randomUUID(); + + UUID destinationDefinition = UUID.randomUUID(); + configRepository.writeStandardDestinationDefinition(new StandardDestinationDefinition().withDestinationDefinitionId(destinationDefinition)); + + DestinationConnection destinationConnection = new DestinationConnection() + .withDestinationId(destination) + .withDestinationDefinitionId(destinationDefinition) + .withWorkspaceId(workspace) + .withConfiguration(Jsons.deserialize("{}")) + .withName("dest") + .withTombstone(false); + + configRepository.writeDestinationConnection(destinationConnection); + UUID retrievedWorkspace = workspaceHelper.getWorkspaceForDestinationId(destination); + + assertEquals(workspace, retrievedWorkspace); + + // check that caching is working + configRepository.writeDestinationConnection(destinationConnection.withWorkspaceId(UUID.randomUUID())); + UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForDestinationId(destination); + assertEquals(workspace, retrievedWorkspaceAfterUpdate); + } + + @Test + public void testConnectionAndJobs() throws IOException, ExecutionException, JsonValidationException { + UUID workspace = UUID.randomUUID(); + + // set up source + UUID source = UUID.randomUUID(); + + UUID sourceDefinition = UUID.randomUUID(); + configRepository.writeStandardSource(new StandardSourceDefinition().withSourceDefinitionId(sourceDefinition)); + + SourceConnection sourceConnection = new SourceConnection() + .withSourceId(source) + .withSourceDefinitionId(sourceDefinition) + .withWorkspaceId(workspace) + .withConfiguration(Jsons.deserialize("{}")) + .withName("source") + .withTombstone(false); + + configRepository.writeSourceConnection(sourceConnection); + + // set up destination + UUID destination = UUID.randomUUID(); + + UUID destinationDefinition = UUID.randomUUID(); + configRepository.writeStandardDestinationDefinition(new StandardDestinationDefinition().withDestinationDefinitionId(destinationDefinition)); + + DestinationConnection destinationConnection = new DestinationConnection() + .withDestinationId(destination) + .withDestinationDefinitionId(destinationDefinition) + .withWorkspaceId(workspace) + .withConfiguration(Jsons.deserialize("{}")) + .withName("dest") + .withTombstone(false); + + configRepository.writeDestinationConnection(destinationConnection); + + // set up connection + UUID connection = UUID.randomUUID(); + configRepository.writeStandardSync(new StandardSync().withManual(true).withConnectionId(connection).withSourceId(source).withDestinationId(destination).withCatalog(new ConfiguredAirbyteCatalog().withStreams(new ArrayList<>()))); + + // test retrieving by connection id + UUID retrievedWorkspace = workspaceHelper.getWorkspaceForConnectionId(connection); + assertEquals(workspace, retrievedWorkspace); + + // test retrieving by source and destination ids + UUID retrievedWorkspaceBySourceAndDestination = workspaceHelper.getWorkspaceForConnectionId(connection); + assertEquals(workspace, retrievedWorkspaceBySourceAndDestination); + + // check that caching is working + UUID newWorkspace = UUID.randomUUID(); + configRepository.writeSourceConnection(sourceConnection.withWorkspaceId(newWorkspace)); + configRepository.writeDestinationConnection(destinationConnection.withWorkspaceId(newWorkspace)); + UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForDestinationId(destination); + assertEquals(workspace, retrievedWorkspaceAfterUpdate); + + // test jobs + long jobId = 123; + Job job = new Job( + jobId, + JobConfig.ConfigType.SYNC, + connection.toString(), + new JobConfig().withConfigType(JobConfig.ConfigType.SYNC).withSync(new JobSyncConfig()), + new ArrayList<>(), + JobStatus.PENDING, + System.currentTimeMillis(), + System.currentTimeMillis(), + System.currentTimeMillis() + ); + when(jobPersistence.getJob(jobId)).thenReturn(job); + + UUID jobWorkspace = workspaceHelper.getWorkspaceForJobId(jobId); + assertEquals(workspace, jobWorkspace); + } +} \ No newline at end of file From 37f76da9ce39ff8aa8e4093103dd20e73b297f18 Mon Sep 17 00:00:00 2001 From: jrhizor Date: Tue, 20 Jul 2021 12:21:00 -0700 Subject: [PATCH 2/3] fmt --- .../server/helpers/WorkspaceHelper.java | 216 ++++++----- .../server/helpers/WorkspaceHelperTest.java | 356 ++++++++++-------- 2 files changed, 313 insertions(+), 259 deletions(-) diff --git a/airbyte-server/src/main/java/io/airbyte/server/helpers/WorkspaceHelper.java b/airbyte-server/src/main/java/io/airbyte/server/helpers/WorkspaceHelper.java index 7923427d89ef0..14d954b06b475 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/helpers/WorkspaceHelper.java +++ b/airbyte-server/src/main/java/io/airbyte/server/helpers/WorkspaceHelper.java @@ -1,6 +1,29 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + package io.airbyte.server.helpers; -import com.fasterxml.jackson.databind.jsonschema.JsonSchema; import com.google.common.base.Preconditions; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; @@ -13,7 +36,6 @@ import io.airbyte.api.model.SourceRead; import io.airbyte.config.JobConfig; import io.airbyte.config.persistence.ConfigNotFoundException; -import io.airbyte.config.persistence.ConfigPersistence; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.scheduler.models.Job; import io.airbyte.scheduler.persistence.JobPersistence; @@ -23,101 +45,111 @@ import io.airbyte.server.handlers.SourceHandler; import io.airbyte.validation.json.JsonSchemaValidator; import io.airbyte.validation.json.JsonValidationException; -import org.apache.commons.lang3.NotImplementedException; - import java.io.IOException; import java.util.Objects; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.NotImplementedException; public class WorkspaceHelper { - private final ConnectionsHandler connectionsHandler; - private final SourceHandler sourceHandler; - private final DestinationHandler destinationHandler; - - private final LoadingCache sourceToWorkspaceCache; - private final LoadingCache destinationToWorkspaceCache; - private final LoadingCache connectionToWorkspaceCache; - private final LoadingCache jobToWorkspaceCache; - - public WorkspaceHelper(ConfigRepository configRepository, JobPersistence jobPersistence, JsonSchemaValidator jsonSchemaValidator, SpecFetcher specFetcher) { - this.connectionsHandler = new ConnectionsHandler(configRepository); - this.sourceHandler = new SourceHandler(configRepository, jsonSchemaValidator, specFetcher, connectionsHandler); - this.destinationHandler = new DestinationHandler(configRepository, jsonSchemaValidator, specFetcher, connectionsHandler); - - this.sourceToWorkspaceCache = getExpiringCache(new CacheLoader<>() { - @Override - public UUID load(UUID sourceId) throws JsonValidationException, ConfigNotFoundException, IOException { - final SourceRead source = sourceHandler.getSource(new SourceIdRequestBody().sourceId(sourceId)); - return source.getWorkspaceId(); - } - }); - - this.destinationToWorkspaceCache = getExpiringCache(new CacheLoader<>() { - @Override - public UUID load(UUID destinationId) throws JsonValidationException, ConfigNotFoundException, IOException { - final DestinationRead destination = destinationHandler.getDestination(new DestinationIdRequestBody().destinationId(destinationId)); - return destination.getWorkspaceId(); - } - }); - - this.connectionToWorkspaceCache = getExpiringCache(new CacheLoader<>() { - @Override - public UUID load(UUID connectionId) throws JsonValidationException, ConfigNotFoundException, IOException, ExecutionException { - final ConnectionRead connection = connectionsHandler.getConnection(new ConnectionIdRequestBody().connectionId(connectionId)); - final UUID sourceId = connection.getSourceId(); - final UUID destinationId = connection.getDestinationId(); - return getWorkspaceForConnection(sourceId, destinationId); - } - }); - - this.jobToWorkspaceCache = getExpiringCache(new CacheLoader<>() { - @Override - public UUID load(Long jobId) throws IOException, ExecutionException { - final Job job = jobPersistence.getJob(jobId); - if(job.getConfigType() == JobConfig.ConfigType.SYNC || job.getConfigType() == JobConfig.ConfigType.RESET_CONNECTION) { - return getWorkspaceForConnectionId(UUID.fromString(job.getScope())); - } else { - throw new IllegalArgumentException("Only sync/reset jobs are associated with workspaces! A " + job.getConfigType() + " job was requested!"); - } - } - }); - } - - public UUID getWorkspaceForSourceId(UUID sourceId) throws ExecutionException { - return sourceToWorkspaceCache.get(sourceId); - } - - public UUID getWorkspaceForDestinationId(UUID destinationId) throws ExecutionException { - return destinationToWorkspaceCache.get(destinationId); - } - - public UUID getWorkspaceForJobId(Long jobId) throws IOException, ExecutionException { - return jobToWorkspaceCache.get(jobId); - } - - public UUID getWorkspaceForConnection(UUID sourceId, UUID destinationId) throws ExecutionException { - final UUID sourceWorkspace = getWorkspaceForSourceId(sourceId); - final UUID destinationWorkspace = getWorkspaceForDestinationId(destinationId); - - Preconditions.checkArgument(Objects.equals(sourceWorkspace, destinationWorkspace), "Source and destination must be from the same workspace!"); - return sourceWorkspace; - } - - public UUID getWorkspaceForConnectionId(UUID connectionId) throws ExecutionException { - return connectionToWorkspaceCache.get(connectionId); - } - - public UUID getWorkspaceForOperationId(UUID operationId) { - throw new NotImplementedException(); - } - - private static LoadingCache getExpiringCache(CacheLoader cacheLoader) { - return CacheBuilder.newBuilder() - .expireAfterAccess(5, TimeUnit.MINUTES) - .build(cacheLoader); - } - -} \ No newline at end of file + private final ConnectionsHandler connectionsHandler; + private final SourceHandler sourceHandler; + private final DestinationHandler destinationHandler; + + private final LoadingCache sourceToWorkspaceCache; + private final LoadingCache destinationToWorkspaceCache; + private final LoadingCache connectionToWorkspaceCache; + private final LoadingCache jobToWorkspaceCache; + + public WorkspaceHelper(ConfigRepository configRepository, + JobPersistence jobPersistence, + JsonSchemaValidator jsonSchemaValidator, + SpecFetcher specFetcher) { + this.connectionsHandler = new ConnectionsHandler(configRepository); + this.sourceHandler = new SourceHandler(configRepository, jsonSchemaValidator, specFetcher, connectionsHandler); + this.destinationHandler = new DestinationHandler(configRepository, jsonSchemaValidator, specFetcher, connectionsHandler); + + this.sourceToWorkspaceCache = getExpiringCache(new CacheLoader<>() { + + @Override + public UUID load(UUID sourceId) throws JsonValidationException, ConfigNotFoundException, IOException { + final SourceRead source = sourceHandler.getSource(new SourceIdRequestBody().sourceId(sourceId)); + return source.getWorkspaceId(); + } + + }); + + this.destinationToWorkspaceCache = getExpiringCache(new CacheLoader<>() { + + @Override + public UUID load(UUID destinationId) throws JsonValidationException, ConfigNotFoundException, IOException { + final DestinationRead destination = destinationHandler.getDestination(new DestinationIdRequestBody().destinationId(destinationId)); + return destination.getWorkspaceId(); + } + + }); + + this.connectionToWorkspaceCache = getExpiringCache(new CacheLoader<>() { + + @Override + public UUID load(UUID connectionId) throws JsonValidationException, ConfigNotFoundException, IOException, ExecutionException { + final ConnectionRead connection = connectionsHandler.getConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + final UUID sourceId = connection.getSourceId(); + final UUID destinationId = connection.getDestinationId(); + return getWorkspaceForConnection(sourceId, destinationId); + } + + }); + + this.jobToWorkspaceCache = getExpiringCache(new CacheLoader<>() { + + @Override + public UUID load(Long jobId) throws IOException, ExecutionException { + final Job job = jobPersistence.getJob(jobId); + if (job.getConfigType() == JobConfig.ConfigType.SYNC || job.getConfigType() == JobConfig.ConfigType.RESET_CONNECTION) { + return getWorkspaceForConnectionId(UUID.fromString(job.getScope())); + } else { + throw new IllegalArgumentException("Only sync/reset jobs are associated with workspaces! A " + job.getConfigType() + " job was requested!"); + } + } + + }); + } + + public UUID getWorkspaceForSourceId(UUID sourceId) throws ExecutionException { + return sourceToWorkspaceCache.get(sourceId); + } + + public UUID getWorkspaceForDestinationId(UUID destinationId) throws ExecutionException { + return destinationToWorkspaceCache.get(destinationId); + } + + public UUID getWorkspaceForJobId(Long jobId) throws IOException, ExecutionException { + return jobToWorkspaceCache.get(jobId); + } + + public UUID getWorkspaceForConnection(UUID sourceId, UUID destinationId) throws ExecutionException { + final UUID sourceWorkspace = getWorkspaceForSourceId(sourceId); + final UUID destinationWorkspace = getWorkspaceForDestinationId(destinationId); + + Preconditions.checkArgument(Objects.equals(sourceWorkspace, destinationWorkspace), "Source and destination must be from the same workspace!"); + return sourceWorkspace; + } + + public UUID getWorkspaceForConnectionId(UUID connectionId) throws ExecutionException { + return connectionToWorkspaceCache.get(connectionId); + } + + public UUID getWorkspaceForOperationId(UUID operationId) { + throw new NotImplementedException(); + } + + private static LoadingCache getExpiringCache(CacheLoader cacheLoader) { + return CacheBuilder.newBuilder() + .expireAfterAccess(5, TimeUnit.MINUTES) + .build(cacheLoader); + } + +} diff --git a/airbyte-server/src/test/java/io/airbyte/server/helpers/WorkspaceHelperTest.java b/airbyte-server/src/test/java/io/airbyte/server/helpers/WorkspaceHelperTest.java index 2e47e165ccde1..a2d716f6d91d4 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/helpers/WorkspaceHelperTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/helpers/WorkspaceHelperTest.java @@ -1,8 +1,36 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + package io.airbyte.server.helpers; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import com.google.common.util.concurrent.UncheckedExecutionException; import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.lang.Exceptions; import io.airbyte.config.DestinationConnection; import io.airbyte.config.JobConfig; import io.airbyte.config.JobSyncConfig; @@ -20,178 +48,172 @@ import io.airbyte.server.converters.SpecFetcher; import io.airbyte.validation.json.JsonSchemaValidator; import io.airbyte.validation.json.JsonValidationException; -import org.apache.commons.lang3.RandomStringUtils; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.UUID; import java.util.concurrent.ExecutionException; - -import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; class WorkspaceHelperTest { - Path tmpDir; - ConfigRepository configRepository; - JobPersistence jobPersistence; - WorkspaceHelper workspaceHelper; - - @BeforeEach - public void setup() throws IOException { - tmpDir = Files.createTempDirectory("workspace_helper_test_" + RandomStringUtils.randomAlphabetic(5)); - - configRepository = new ConfigRepository(new FileSystemConfigPersistence(tmpDir)); - jobPersistence = mock(JobPersistence.class); - - SpecFetcher specFetcher = mock(SpecFetcher.class); - when(specFetcher.execute(any())).thenReturn(new ConnectorSpecification().withConnectionSpecification(Jsons.deserialize("{}"))); - - workspaceHelper = new WorkspaceHelper(configRepository, jobPersistence, new JsonSchemaValidator(), specFetcher); - } - - @Test - public void testObjectsThatDoNotExist() { - assertThrows(ExecutionException.class, () -> workspaceHelper.getWorkspaceForSourceId(UUID.randomUUID())); - assertThrows(ExecutionException.class, () -> workspaceHelper.getWorkspaceForDestinationId(UUID.randomUUID())); - assertThrows(ExecutionException.class, () -> workspaceHelper.getWorkspaceForConnectionId(UUID.randomUUID())); - assertThrows(ExecutionException.class, () -> workspaceHelper.getWorkspaceForConnection(UUID.randomUUID(), UUID.randomUUID())); - assertThrows(UncheckedExecutionException.class, () -> workspaceHelper.getWorkspaceForJobId(0L)); - // todo: add operationId check - } - - @Test - public void testSource() throws IOException, ExecutionException, JsonValidationException { - UUID source = UUID.randomUUID(); - UUID workspace = UUID.randomUUID(); - - UUID sourceDefinition = UUID.randomUUID(); - configRepository.writeStandardSource(new StandardSourceDefinition().withSourceDefinitionId(sourceDefinition)); - - SourceConnection sourceConnection = new SourceConnection() - .withSourceId(source) - .withSourceDefinitionId(sourceDefinition) - .withWorkspaceId(workspace) - .withConfiguration(Jsons.deserialize("{}")) - .withName("source") - .withTombstone(false); - - configRepository.writeSourceConnection(sourceConnection); - UUID retrievedWorkspace = workspaceHelper.getWorkspaceForSourceId(source); - - assertEquals(workspace, retrievedWorkspace); - - // check that caching is working - configRepository.writeSourceConnection(sourceConnection.withWorkspaceId(UUID.randomUUID())); - UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForSourceId(source); - assertEquals(workspace, retrievedWorkspaceAfterUpdate); - } - - @Test - public void testDestination() throws IOException, ExecutionException, JsonValidationException { - UUID destination = UUID.randomUUID(); - UUID workspace = UUID.randomUUID(); - - UUID destinationDefinition = UUID.randomUUID(); - configRepository.writeStandardDestinationDefinition(new StandardDestinationDefinition().withDestinationDefinitionId(destinationDefinition)); - - DestinationConnection destinationConnection = new DestinationConnection() - .withDestinationId(destination) - .withDestinationDefinitionId(destinationDefinition) - .withWorkspaceId(workspace) - .withConfiguration(Jsons.deserialize("{}")) - .withName("dest") - .withTombstone(false); - - configRepository.writeDestinationConnection(destinationConnection); - UUID retrievedWorkspace = workspaceHelper.getWorkspaceForDestinationId(destination); - - assertEquals(workspace, retrievedWorkspace); - - // check that caching is working - configRepository.writeDestinationConnection(destinationConnection.withWorkspaceId(UUID.randomUUID())); - UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForDestinationId(destination); - assertEquals(workspace, retrievedWorkspaceAfterUpdate); - } - - @Test - public void testConnectionAndJobs() throws IOException, ExecutionException, JsonValidationException { - UUID workspace = UUID.randomUUID(); - - // set up source - UUID source = UUID.randomUUID(); - - UUID sourceDefinition = UUID.randomUUID(); - configRepository.writeStandardSource(new StandardSourceDefinition().withSourceDefinitionId(sourceDefinition)); - - SourceConnection sourceConnection = new SourceConnection() - .withSourceId(source) - .withSourceDefinitionId(sourceDefinition) - .withWorkspaceId(workspace) - .withConfiguration(Jsons.deserialize("{}")) - .withName("source") - .withTombstone(false); - - configRepository.writeSourceConnection(sourceConnection); - - // set up destination - UUID destination = UUID.randomUUID(); - - UUID destinationDefinition = UUID.randomUUID(); - configRepository.writeStandardDestinationDefinition(new StandardDestinationDefinition().withDestinationDefinitionId(destinationDefinition)); - - DestinationConnection destinationConnection = new DestinationConnection() - .withDestinationId(destination) - .withDestinationDefinitionId(destinationDefinition) - .withWorkspaceId(workspace) - .withConfiguration(Jsons.deserialize("{}")) - .withName("dest") - .withTombstone(false); - - configRepository.writeDestinationConnection(destinationConnection); - - // set up connection - UUID connection = UUID.randomUUID(); - configRepository.writeStandardSync(new StandardSync().withManual(true).withConnectionId(connection).withSourceId(source).withDestinationId(destination).withCatalog(new ConfiguredAirbyteCatalog().withStreams(new ArrayList<>()))); - - // test retrieving by connection id - UUID retrievedWorkspace = workspaceHelper.getWorkspaceForConnectionId(connection); - assertEquals(workspace, retrievedWorkspace); - - // test retrieving by source and destination ids - UUID retrievedWorkspaceBySourceAndDestination = workspaceHelper.getWorkspaceForConnectionId(connection); - assertEquals(workspace, retrievedWorkspaceBySourceAndDestination); - - // check that caching is working - UUID newWorkspace = UUID.randomUUID(); - configRepository.writeSourceConnection(sourceConnection.withWorkspaceId(newWorkspace)); - configRepository.writeDestinationConnection(destinationConnection.withWorkspaceId(newWorkspace)); - UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForDestinationId(destination); - assertEquals(workspace, retrievedWorkspaceAfterUpdate); - - // test jobs - long jobId = 123; - Job job = new Job( - jobId, - JobConfig.ConfigType.SYNC, - connection.toString(), - new JobConfig().withConfigType(JobConfig.ConfigType.SYNC).withSync(new JobSyncConfig()), - new ArrayList<>(), - JobStatus.PENDING, - System.currentTimeMillis(), - System.currentTimeMillis(), - System.currentTimeMillis() - ); - when(jobPersistence.getJob(jobId)).thenReturn(job); - - UUID jobWorkspace = workspaceHelper.getWorkspaceForJobId(jobId); - assertEquals(workspace, jobWorkspace); - } -} \ No newline at end of file + Path tmpDir; + ConfigRepository configRepository; + JobPersistence jobPersistence; + WorkspaceHelper workspaceHelper; + + @BeforeEach + public void setup() throws IOException { + tmpDir = Files.createTempDirectory("workspace_helper_test_" + RandomStringUtils.randomAlphabetic(5)); + + configRepository = new ConfigRepository(new FileSystemConfigPersistence(tmpDir)); + jobPersistence = mock(JobPersistence.class); + + SpecFetcher specFetcher = mock(SpecFetcher.class); + when(specFetcher.execute(any())).thenReturn(new ConnectorSpecification().withConnectionSpecification(Jsons.deserialize("{}"))); + + workspaceHelper = new WorkspaceHelper(configRepository, jobPersistence, new JsonSchemaValidator(), specFetcher); + } + + @Test + public void testObjectsThatDoNotExist() { + assertThrows(ExecutionException.class, () -> workspaceHelper.getWorkspaceForSourceId(UUID.randomUUID())); + assertThrows(ExecutionException.class, () -> workspaceHelper.getWorkspaceForDestinationId(UUID.randomUUID())); + assertThrows(ExecutionException.class, () -> workspaceHelper.getWorkspaceForConnectionId(UUID.randomUUID())); + assertThrows(ExecutionException.class, () -> workspaceHelper.getWorkspaceForConnection(UUID.randomUUID(), UUID.randomUUID())); + assertThrows(UncheckedExecutionException.class, () -> workspaceHelper.getWorkspaceForJobId(0L)); + // todo: add operationId check + } + + @Test + public void testSource() throws IOException, ExecutionException, JsonValidationException { + UUID source = UUID.randomUUID(); + UUID workspace = UUID.randomUUID(); + + UUID sourceDefinition = UUID.randomUUID(); + configRepository.writeStandardSource(new StandardSourceDefinition().withSourceDefinitionId(sourceDefinition)); + + SourceConnection sourceConnection = new SourceConnection() + .withSourceId(source) + .withSourceDefinitionId(sourceDefinition) + .withWorkspaceId(workspace) + .withConfiguration(Jsons.deserialize("{}")) + .withName("source") + .withTombstone(false); + + configRepository.writeSourceConnection(sourceConnection); + UUID retrievedWorkspace = workspaceHelper.getWorkspaceForSourceId(source); + + assertEquals(workspace, retrievedWorkspace); + + // check that caching is working + configRepository.writeSourceConnection(sourceConnection.withWorkspaceId(UUID.randomUUID())); + UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForSourceId(source); + assertEquals(workspace, retrievedWorkspaceAfterUpdate); + } + + @Test + public void testDestination() throws IOException, ExecutionException, JsonValidationException { + UUID destination = UUID.randomUUID(); + UUID workspace = UUID.randomUUID(); + + UUID destinationDefinition = UUID.randomUUID(); + configRepository.writeStandardDestinationDefinition(new StandardDestinationDefinition().withDestinationDefinitionId(destinationDefinition)); + + DestinationConnection destinationConnection = new DestinationConnection() + .withDestinationId(destination) + .withDestinationDefinitionId(destinationDefinition) + .withWorkspaceId(workspace) + .withConfiguration(Jsons.deserialize("{}")) + .withName("dest") + .withTombstone(false); + + configRepository.writeDestinationConnection(destinationConnection); + UUID retrievedWorkspace = workspaceHelper.getWorkspaceForDestinationId(destination); + + assertEquals(workspace, retrievedWorkspace); + + // check that caching is working + configRepository.writeDestinationConnection(destinationConnection.withWorkspaceId(UUID.randomUUID())); + UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForDestinationId(destination); + assertEquals(workspace, retrievedWorkspaceAfterUpdate); + } + + @Test + public void testConnectionAndJobs() throws IOException, ExecutionException, JsonValidationException { + UUID workspace = UUID.randomUUID(); + + // set up source + UUID source = UUID.randomUUID(); + + UUID sourceDefinition = UUID.randomUUID(); + configRepository.writeStandardSource(new StandardSourceDefinition().withSourceDefinitionId(sourceDefinition)); + + SourceConnection sourceConnection = new SourceConnection() + .withSourceId(source) + .withSourceDefinitionId(sourceDefinition) + .withWorkspaceId(workspace) + .withConfiguration(Jsons.deserialize("{}")) + .withName("source") + .withTombstone(false); + + configRepository.writeSourceConnection(sourceConnection); + + // set up destination + UUID destination = UUID.randomUUID(); + + UUID destinationDefinition = UUID.randomUUID(); + configRepository.writeStandardDestinationDefinition(new StandardDestinationDefinition().withDestinationDefinitionId(destinationDefinition)); + + DestinationConnection destinationConnection = new DestinationConnection() + .withDestinationId(destination) + .withDestinationDefinitionId(destinationDefinition) + .withWorkspaceId(workspace) + .withConfiguration(Jsons.deserialize("{}")) + .withName("dest") + .withTombstone(false); + + configRepository.writeDestinationConnection(destinationConnection); + + // set up connection + UUID connection = UUID.randomUUID(); + configRepository.writeStandardSync(new StandardSync().withManual(true).withConnectionId(connection).withSourceId(source) + .withDestinationId(destination).withCatalog(new ConfiguredAirbyteCatalog().withStreams(new ArrayList<>()))); + + // test retrieving by connection id + UUID retrievedWorkspace = workspaceHelper.getWorkspaceForConnectionId(connection); + assertEquals(workspace, retrievedWorkspace); + + // test retrieving by source and destination ids + UUID retrievedWorkspaceBySourceAndDestination = workspaceHelper.getWorkspaceForConnectionId(connection); + assertEquals(workspace, retrievedWorkspaceBySourceAndDestination); + + // check that caching is working + UUID newWorkspace = UUID.randomUUID(); + configRepository.writeSourceConnection(sourceConnection.withWorkspaceId(newWorkspace)); + configRepository.writeDestinationConnection(destinationConnection.withWorkspaceId(newWorkspace)); + UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForDestinationId(destination); + assertEquals(workspace, retrievedWorkspaceAfterUpdate); + + // test jobs + long jobId = 123; + Job job = new Job( + jobId, + JobConfig.ConfigType.SYNC, + connection.toString(), + new JobConfig().withConfigType(JobConfig.ConfigType.SYNC).withSync(new JobSyncConfig()), + new ArrayList<>(), + JobStatus.PENDING, + System.currentTimeMillis(), + System.currentTimeMillis(), + System.currentTimeMillis()); + when(jobPersistence.getJob(jobId)).thenReturn(job); + + UUID jobWorkspace = workspaceHelper.getWorkspaceForJobId(jobId); + assertEquals(workspace, jobWorkspace); + } + +} From d080a236f2da7277e148fdff2e1cece3d49d6d8e Mon Sep 17 00:00:00 2001 From: jrhizor Date: Tue, 20 Jul 2021 16:20:10 -0700 Subject: [PATCH 3/3] switch to a fixed limit --- .../main/java/io/airbyte/server/helpers/WorkspaceHelper.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airbyte-server/src/main/java/io/airbyte/server/helpers/WorkspaceHelper.java b/airbyte-server/src/main/java/io/airbyte/server/helpers/WorkspaceHelper.java index 14d954b06b475..19cf1b34ba7a9 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/helpers/WorkspaceHelper.java +++ b/airbyte-server/src/main/java/io/airbyte/server/helpers/WorkspaceHelper.java @@ -49,7 +49,6 @@ import java.util.Objects; import java.util.UUID; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.NotImplementedException; public class WorkspaceHelper { @@ -148,7 +147,7 @@ public UUID getWorkspaceForOperationId(UUID operationId) { private static LoadingCache getExpiringCache(CacheLoader cacheLoader) { return CacheBuilder.newBuilder() - .expireAfterAccess(5, TimeUnit.MINUTES) + .maximumSize(20000) .build(cacheLoader); }