Skip to content

Commit

Permalink
Add scheduler client that pulls from bucket cache (#5605)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens authored Sep 11, 2021
1 parent efacd5d commit 7970e63
Show file tree
Hide file tree
Showing 6 changed files with 292 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public interface Configs {

WorkerEnvironment getWorkerEnvironment();

String getSpecCacheBucket();

WorkspaceRetentionConfig getWorkspaceRetentionConfig();

List<WorkerPodToleration> getWorkerPodTolerations();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@ public class EnvConfigs implements Configs {

private static final Logger LOGGER = LoggerFactory.getLogger(EnvConfigs.class);

// env variable names
public static final String AIRBYTE_ROLE = "AIRBYTE_ROLE";
public static final String AIRBYTE_VERSION = "AIRBYTE_VERSION";
public static final String INTERNAL_API_HOST = "INTERNAL_API_HOST";
public static final String WORKER_ENVIRONMENT = "WORKER_ENVIRONMENT";
public static final String SPEC_CACHE_BUCKET = "SPEC_CACHE_BUCKET";
public static final String WORKSPACE_ROOT = "WORKSPACE_ROOT";
public static final String WORKSPACE_DOCKER_MOUNT = "WORKSPACE_DOCKER_MOUNT";
public static final String LOCAL_ROOT = "LOCAL_ROOT";
Expand Down Expand Up @@ -84,6 +86,9 @@ public class EnvConfigs implements Configs {
private static final String RESOURCE_CPU_LIMIT = "RESOURCE_CPU_LIMIT";
private static final String RESOURCE_MEMORY_REQUEST = "RESOURCE_MEMORY_REQUEST";
private static final String RESOURCE_MEMORY_LIMIT = "RESOURCE_MEMORY_LIMIT";

// defaults
private static final String DEFAULT_SPEC_CACHE_BUCKET = "io-airbyte-cloud-spec-cache";
private static final String DEFAULT_KUBE_NAMESPACE = "default";
private static final String DEFAULT_RESOURCE_REQUIREMENT_CPU = null;
private static final String DEFAULT_RESOURCE_REQUIREMENT_MEMORY = null;
Expand Down Expand Up @@ -245,17 +250,22 @@ public WorkerEnvironment getWorkerEnvironment() {
return getEnvOrDefault(WORKER_ENVIRONMENT, WorkerEnvironment.DOCKER, s -> WorkerEnvironment.valueOf(s.toUpperCase()));
}

@Override
public String getSpecCacheBucket() {
return getEnvOrDefault(SPEC_CACHE_BUCKET, DEFAULT_SPEC_CACHE_BUCKET);
}

@Override
public WorkspaceRetentionConfig getWorkspaceRetentionConfig() {
long minDays = getEnvOrDefault(MINIMUM_WORKSPACE_RETENTION_DAYS, DEFAULT_MINIMUM_WORKSPACE_RETENTION_DAYS);
long maxDays = getEnvOrDefault(MAXIMUM_WORKSPACE_RETENTION_DAYS, DEFAULT_MAXIMUM_WORKSPACE_RETENTION_DAYS);
long maxSizeMb = getEnvOrDefault(MAXIMUM_WORKSPACE_SIZE_MB, DEFAULT_MAXIMUM_WORKSPACE_SIZE_MB);
final long minDays = getEnvOrDefault(MINIMUM_WORKSPACE_RETENTION_DAYS, DEFAULT_MINIMUM_WORKSPACE_RETENTION_DAYS);
final long maxDays = getEnvOrDefault(MAXIMUM_WORKSPACE_RETENTION_DAYS, DEFAULT_MAXIMUM_WORKSPACE_RETENTION_DAYS);
final long maxSizeMb = getEnvOrDefault(MAXIMUM_WORKSPACE_SIZE_MB, DEFAULT_MAXIMUM_WORKSPACE_SIZE_MB);

return new WorkspaceRetentionConfig(minDays, maxDays, maxSizeMb);
}

private WorkerPodToleration workerPodToleration(String tolerationStr) {
Map<String, String> tolerationMap = Splitter.on(",")
private WorkerPodToleration workerPodToleration(final String tolerationStr) {
final Map<String, String> tolerationMap = Splitter.on(",")
.splitToStream(tolerationStr)
.map(s -> s.split("="))
.collect(Collectors.toMap(s -> s[0], s -> s[1]));
Expand Down Expand Up @@ -288,9 +298,9 @@ private WorkerPodToleration workerPodToleration(String tolerationStr) {
*/
@Override
public List<WorkerPodToleration> getWorkerPodTolerations() {
String tolerationsStr = getEnvOrDefault(WORKER_POD_TOLERATIONS, "");
final String tolerationsStr = getEnvOrDefault(WORKER_POD_TOLERATIONS, "");

Stream<String> tolerations = Strings.isNullOrEmpty(tolerationsStr) ? Stream.of()
final Stream<String> tolerations = Strings.isNullOrEmpty(tolerationsStr) ? Stream.of()
: Splitter.on(";")
.splitToStream(tolerationsStr)
.filter(tolerationStr -> !Strings.isNullOrEmpty(tolerationStr));
Expand All @@ -317,7 +327,7 @@ public String getTemporalHost() {

@Override
public Set<Integer> getTemporalWorkerPorts() {
var ports = getEnvOrDefault(TEMPORAL_WORKER_PORTS, "");
final var ports = getEnvOrDefault(TEMPORAL_WORKER_PORTS, "");
if (ports.isEmpty()) {
return new HashSet<>();
}
Expand Down Expand Up @@ -389,27 +399,27 @@ public String getGoogleApplicationCredentials() {
return getEnvOrDefault(LogClientSingleton.GOOGLE_APPLICATION_CREDENTIALS, "");
}

private String getEnvOrDefault(String key, String defaultValue) {
private String getEnvOrDefault(final String key, final String defaultValue) {
return getEnvOrDefault(key, defaultValue, Function.identity(), false);
}

private String getEnvOrDefault(String key, String defaultValue, boolean isSecret) {
private String getEnvOrDefault(final String key, final String defaultValue, final boolean isSecret) {
return getEnvOrDefault(key, defaultValue, Function.identity(), isSecret);
}

private long getEnvOrDefault(String key, long defaultValue) {
private long getEnvOrDefault(final String key, final long defaultValue) {
return getEnvOrDefault(key, defaultValue, Long::parseLong, false);
}

private boolean getEnvOrDefault(String key, boolean defaultValue) {
private boolean getEnvOrDefault(final String key, final boolean defaultValue) {
return getEnvOrDefault(key, defaultValue, Boolean::parseBoolean);
}

private <T> T getEnvOrDefault(String key, T defaultValue, Function<String, T> parser) {
private <T> T getEnvOrDefault(final String key, final T defaultValue, final Function<String, T> parser) {
return getEnvOrDefault(key, defaultValue, parser, false);
}

private <T> T getEnvOrDefault(String key, T defaultValue, Function<String, T> parser, boolean isSecret) {
private <T> T getEnvOrDefault(final String key, final T defaultValue, final Function<String, T> parser, final boolean isSecret) {
final String value = getEnv.apply(key);
if (value != null && !value.isEmpty()) {
return parser.apply(value);
Expand Down
3 changes: 3 additions & 0 deletions airbyte-scheduler/client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ plugins {

dependencies {
implementation project(':airbyte-config:models')
implementation project(':airbyte-json-validation')
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-scheduler:models')
implementation project(':airbyte-scheduler:persistence')
// todo (cgardens) - remove this dep. just needs temporal client.
implementation project(':airbyte-workers')

compile 'com.google.cloud:google-cloud-storage:2.0.1'

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.scheduler.client;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.api.client.util.Preconditions;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteProtocolSchema;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BucketSpecCacheSchedulerClient implements SynchronousSchedulerClient {

private static final Logger LOGGER = LoggerFactory.getLogger(BucketSpecCacheSchedulerClient.class);

private final SynchronousSchedulerClient client;
private final Function<String, Optional<ConnectorSpecification>> bucketSpecFetcher;

public BucketSpecCacheSchedulerClient(final SynchronousSchedulerClient client, final String bucketName) {
this(
client,
dockerImage -> attemptToFetchSpecFromBucket(StorageOptions.getDefaultInstance().getService(), bucketName, dockerImage));
}

@VisibleForTesting
BucketSpecCacheSchedulerClient(final SynchronousSchedulerClient client,
final Function<String, Optional<ConnectorSpecification>> bucketSpecFetcher) {
this.client = client;
this.bucketSpecFetcher = bucketSpecFetcher;
}

@Override
public SynchronousResponse<StandardCheckConnectionOutput> createSourceCheckConnectionJob(final SourceConnection source, final String dockerImage)
throws IOException {
return client.createSourceCheckConnectionJob(source, dockerImage);
}

@Override
public SynchronousResponse<StandardCheckConnectionOutput> createDestinationCheckConnectionJob(final DestinationConnection destination,
final String dockerImage)
throws IOException {
return client.createDestinationCheckConnectionJob(destination, dockerImage);
}

@Override
public SynchronousResponse<AirbyteCatalog> createDiscoverSchemaJob(final SourceConnection source, final String dockerImage) throws IOException {
return client.createDiscoverSchemaJob(source, dockerImage);
}

@Override
public SynchronousResponse<ConnectorSpecification> createGetSpecJob(final String dockerImage) throws IOException {
Optional<ConnectorSpecification> cachedSpecOptional;
// never want to fail because we could not fetch from off board storage.
try {
cachedSpecOptional = bucketSpecFetcher.apply(dockerImage);
} catch (final RuntimeException e) {
cachedSpecOptional = Optional.empty();
}

if (cachedSpecOptional.isPresent()) {
final long now = Instant.now().toEpochMilli();
final SynchronousJobMetadata mockMetadata = new SynchronousJobMetadata(
UUID.randomUUID(),
ConfigType.GET_SPEC,
null,
now,
now,
true,
Path.of(""));
return new SynchronousResponse<>(cachedSpecOptional.get(), mockMetadata);
} else {
return client.createGetSpecJob(dockerImage);
}
}

private static void validateConfig(final JsonNode json) throws JsonValidationException {
final JsonSchemaValidator jsonSchemaValidator = new JsonSchemaValidator();
final JsonNode specJsonSchema = JsonSchemaValidator.getSchema(AirbyteProtocolSchema.PROTOCOL.getFile(), "ConnectorSpecification");
jsonSchemaValidator.ensure(specJsonSchema, json);
}

private static Optional<ConnectorSpecification> attemptToFetchSpecFromBucket(final Storage storage,
final String bucketName,
final String dockerImage) {
final String[] dockerImageComponents = dockerImage.split(":");
Preconditions.checkArgument(dockerImageComponents.length == 2, "Invalidate docker image: " + dockerImage);
final String dockerImageName = dockerImageComponents[0];
final String dockerImageTag = dockerImageComponents[1];
final Blob specAsBlob =
storage.get(bucketName, Path.of("specs").resolve(dockerImageName).resolve(dockerImageTag).resolve("spec.json").toString());

// if null it means the object was not found.
if (specAsBlob == null) {
LOGGER.warn("Spec not found in bucket storage");
return Optional.empty();
}

final String specAsString = new String(specAsBlob.getContent(), StandardCharsets.UTF_8);
try {
validateConfig(Jsons.deserialize(specAsString));
} catch (final JsonValidationException e) {
LOGGER.error("Received invalid spec from bucket store. Received: {}", specAsString);
return Optional.empty();
}
return Optional.of(Jsons.deserialize(specAsString, ConnectorSpecification.class));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.scheduler.client;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;

import io.airbyte.protocol.models.ConnectorSpecification;
import java.io.IOException;
import java.util.Optional;
import java.util.function.Function;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

class BucketSpecCacheSchedulerClientTest {

private SynchronousSchedulerClient defaultClientMock;
private Function<String, Optional<ConnectorSpecification>> bucketSpecFetcherMock;

@SuppressWarnings("unchecked")
@BeforeEach
void setup() {
defaultClientMock = mock(SynchronousSchedulerClient.class);
bucketSpecFetcherMock = mock(Function.class);
}

@Test
void testGetsSpecIfPresent() throws IOException {
when(bucketSpecFetcherMock.apply("source-pokeapi:0.1.0")).thenReturn(Optional.of(new ConnectorSpecification()));
final BucketSpecCacheSchedulerClient client = new BucketSpecCacheSchedulerClient(defaultClientMock, bucketSpecFetcherMock);
assertEquals(new ConnectorSpecification(), client.createGetSpecJob("source-pokeapi:0.1.0").getOutput());
verifyNoInteractions(defaultClientMock);
}

@Test
void testCallsDelegateIfNotPresent() throws IOException {
when(bucketSpecFetcherMock.apply("source-pokeapi:0.1.0")).thenReturn(Optional.empty());
when(defaultClientMock.createGetSpecJob("source-pokeapi:0.1.0"))
.thenReturn(new SynchronousResponse<>(new ConnectorSpecification(), mock(SynchronousJobMetadata.class)));
final BucketSpecCacheSchedulerClient client = new BucketSpecCacheSchedulerClient(defaultClientMock, bucketSpecFetcherMock);
assertEquals(new ConnectorSpecification(), client.createGetSpecJob("source-pokeapi:0.1.0").getOutput());
}

@Test
void testCallsDelegateIfException() throws IOException {
when(bucketSpecFetcherMock.apply("source-pokeapi:0.1.0")).thenThrow(new RuntimeException("induced exception"));
when(defaultClientMock.createGetSpecJob("source-pokeapi:0.1.0"))
.thenReturn(new SynchronousResponse<>(new ConnectorSpecification(), mock(SynchronousJobMetadata.class)));
final BucketSpecCacheSchedulerClient client = new BucketSpecCacheSchedulerClient(defaultClientMock, bucketSpecFetcherMock);
assertEquals(new ConnectorSpecification(), client.createGetSpecJob("source-pokeapi:0.1.0").getOutput());
}

// todo (cgardens) - this is essentially an integration test. run it manually to sanity check that
// the client can pull. from the spec cache bucket. when we have a better setup for integation
// testing for the platform we should move it there.
@Disabled
@Test
void testGetsSpecFromBucket() throws IOException {
when(bucketSpecFetcherMock.apply("source-pokeapi:0.1.0")).thenReturn(Optional.of(new ConnectorSpecification()));
// todo (cgardens) - replace with prod bucket.
final BucketSpecCacheSchedulerClient client = new BucketSpecCacheSchedulerClient(defaultClientMock, "cg-specs");
final ConnectorSpecification actualSpec = client.createGetSpecJob("source-pokeapi:0.1.0").getOutput();
assertTrue(actualSpec.getDocumentationUrl().toString().contains("poke"));
}

}
Loading

0 comments on commit 7970e63

Please sign in to comment.