Skip to content

Commit

Permalink
Merge branch 'master' into liren/destination-namespace-charset
Browse files Browse the repository at this point in the history
  • Loading branch information
tuliren committed Mar 19, 2022
2 parents b9caead + 298551d commit b4d789c
Show file tree
Hide file tree
Showing 108 changed files with 3,519 additions and 654 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.35.57-alpha
current_version = 0.35.58-alpha
commit = False
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(\-[a-z]+)?
Expand Down
3 changes: 2 additions & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@


### SHARED ###
VERSION=0.35.57-alpha
VERSION=0.35.58-alpha

# When using the airbyte-db via default docker image
CONFIG_ROOT=/data
Expand Down Expand Up @@ -91,3 +91,4 @@ MAX_DISCOVER_WORKERS=5

### FEATURE FLAGS ###
NEW_SCHEDULER=false
AUTO_DISABLE_FAILING_CONNECTIONS=false
4 changes: 2 additions & 2 deletions airbyte-bootloader/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ ENV APPLICATION airbyte-bootloader

WORKDIR /app

ADD bin/${APPLICATION}-0.35.57-alpha.tar /app
ADD bin/${APPLICATION}-0.35.58-alpha.tar /app

ENTRYPOINT ["/bin/bash", "-c", "${APPLICATION}-0.35.57-alpha/bin/${APPLICATION}"]
ENTRYPOINT ["/bin/bash", "-c", "${APPLICATION}-0.35.58-alpha/bin/${APPLICATION}"]
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ void testBootloaderAppBlankDb() throws Exception {
mockedConfigs.getConfigDatabaseUrl())
.getAndInitialize();
val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, this.getClass().getName());
assertEquals("0.35.54.001", configsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals("0.35.56.001", configsMigrator.getLatestMigration().getVersion().getVersion());

val jobsPersistence = new DefaultJobPersistence(jobDatabase);
assertEquals(version, jobsPersistence.getVersion().get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,11 @@ public boolean usesNewScheduler() {
return Boolean.parseBoolean(System.getenv("NEW_SCHEDULER"));
}

@Override
public boolean autoDisablesFailingConnections() {
log.info("Auto Disable Failing Connections: " + Boolean.parseBoolean(System.getenv("AUTO_DISABLE_FAILING_CONNECTIONS")));

return Boolean.parseBoolean(System.getenv("AUTO_DISABLE_FAILING_CONNECTIONS"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ public interface FeatureFlags {

boolean usesNewScheduler();

boolean autoDisablesFailingConnections();

}
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@
- name: Redshift
destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerRepository: airbyte/destination-redshift
dockerImageTag: 0.3.27
dockerImageTag: 0.3.28
documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift
icon: redshift.svg
resourceRequirements:
Expand Down Expand Up @@ -221,7 +221,7 @@
- name: Snowflake
destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.4.20
dockerImageTag: 0.4.21
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
icon: snowflake.svg
resourceRequirements:
Expand Down
58 changes: 56 additions & 2 deletions airbyte-config/init/src/main/resources/seed/destination_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3272,7 +3272,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-redshift:0.3.27"
- dockerImage: "airbyte/destination-redshift:0.3.28"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/redshift"
connectionSpecification:
Expand Down Expand Up @@ -3825,7 +3825,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-snowflake:0.4.20"
- dockerImage: "airbyte/destination-snowflake:0.4.21"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/snowflake"
connectionSpecification:
Expand Down Expand Up @@ -4074,6 +4074,60 @@
airbyte_secret: true
multiline: true
order: 3
- title: "Azure Blob Storage Staging"
additionalProperties: false
description: "Writes large batches of records to a file, uploads the file\
\ to Azure Blob Storage, then uses <pre>COPY INTO table</pre> to upload\
\ the file. Recommended for large production workloads for better speed\
\ and scalability."
required:
- "method"
- "azure_blob_storage_account_name"
- "azure_blob_storage_container_name"
- "azure_blob_storage_sas_token"
properties:
method:
type: "string"
enum:
- "Azure Blob Staging"
default: "Azure Blob Staging"
order: 0
azure_blob_storage_endpoint_domain_name:
title: "Endpoint Domain Name"
type: "string"
default: "blob.core.windows.net"
description: "This is Azure Blob Storage endpoint domain name. Leave\
\ default value (or leave it empty if run container from command\
\ line) to use Microsoft native from example."
examples:
- "blob.core.windows.net"
order: 1
azure_blob_storage_account_name:
title: "Azure Blob Storage Account Name"
type: "string"
description: "The account's name of the Azure Blob Storage."
examples:
- "airbyte5storage"
order: 2
azure_blob_storage_container_name:
title: "Azure blob storage container (Bucket) Name"
type: "string"
description: "The name of the Azure blob storage container. *This\
\ needs to coincide with the container specified in the Snowflake\
\ Storage Integration and Snowflake Azure External Stage (see description\
\ of 'Snowflake Azure External Stage' for details"
examples:
- "airbytetestcontainername"
order: 3
azure_blob_storage_sas_token:
title: "SAS Token"
type: "string"
airbyte_secret: true
description: "Shared access signature(SAS) token to grant Snowflake\
\ limited access to objects in your storage account. See more https://docs.snowflake.com/en/user-guide/data-load-azure-config.html#option-2-generating-a-sas-token"
examples:
- "?sv=2016-05-31&ss=b&srt=sco&sp=rwdl&se=2018-06-27T10:05:50Z&st=2017-06-27T02:05:50Z&spr=https,http&sig=bgqQwoXwxzuD2GJfagRg7VOS8hzNr3QLT7rhS8OFRLQ%3D"
order: 4
supportsIncremental: true
supportsNormalization: true
supportsDBT: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@
- name: HubSpot
sourceDefinitionId: 36c891d9-4bd9-43ac-bad2-10e12756272c
dockerRepository: airbyte/source-hubspot
dockerImageTag: 0.1.48
dockerImageTag: 0.1.49
documentationUrl: https://docs.airbyte.io/integrations/sources/hubspot
icon: hubspot.svg
sourceType: api
Expand Down
10 changes: 5 additions & 5 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3354,7 +3354,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-hubspot:0.1.48"
- dockerImage: "airbyte/source-hubspot:0.1.49"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/hubspot"
connectionSpecification:
Expand Down Expand Up @@ -3399,15 +3399,15 @@
client_id:
title: "Client ID"
description: "The Client ID of your HubSpot developer application.\
\ See our <a href=\"https://docs.airbyte.io/integrations/sources/hubspot\"\
\ See our <a href=\"https://docs.airbyte.io/integrations/sources/hubspot#getting-started\"\
>docs</a> if you need help finding this id."
type: "string"
examples:
- "123456789000"
client_secret:
title: "Client Secret"
description: "The Client Secret of your HubSpot developer application.\
\ See our <a href=\"https://docs.airbyte.io/integrations/sources/hubspot\"\
\ See our <a href=\"https://docs.airbyte.io/integrations/sources/hubspot#getting-started\"\
>docs</a> if you need help finding this secret."
type: "string"
examples:
Expand All @@ -3416,7 +3416,7 @@
refresh_token:
title: "Refresh Token"
description: "Refresh Token to renew the expired Access Token. See\
\ our <a href=\"https://docs.airbyte.io/integrations/sources/hubspot\"\
\ our <a href=\"https://docs.airbyte.io/integrations/sources/hubspot#getting-started\"\
>docs</a> if you need help generating the token."
type: "string"
examples:
Expand All @@ -3439,7 +3439,7 @@
order: 0
api_key:
title: "API key"
description: "HubSpot API Key. See our <a href=\"https://docs.airbyte.io/integrations/sources/hubspot\"\
description: "HubSpot API Key. See our <a href=\"https://docs.airbyte.io/integrations/sources/hubspot#getting-started\"\
>docs</a> if you need help finding this key."
type: "string"
airbyte_secret: true
Expand Down
12 changes: 12 additions & 0 deletions airbyte-config/models/src/main/java/io/airbyte/config/Configs.java
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,18 @@ public interface Configs {
*/
Map<String, String> getJobDefaultEnvMap();

/**
* Defines the number of consecutive job failures required before a connection is auto-disabled if
* the AUTO_DISABLE_FAILING_CONNECTIONS flag is set to true.
*/
int getMaxFailedJobsInARowBeforeConnectionDisable();

/**
* Defines the required number of days with only failed jobs before a connection is auto-disabled if
* the AUTO_DISABLE_FAILING_CONNECTIONS flag is set to true.
*/
int getMaxDaysOfOnlyFailedJobsBeforeConnectionDisable();

// Jobs - Kube only
/**
* Define the check job container's minimum CPU request. Defaults to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ public class EnvConfigs implements Configs {
private static final String SHOULD_RUN_SYNC_WORKFLOWS = "SHOULD_RUN_SYNC_WORKFLOWS";
private static final String SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS = "SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS";

private static final String MAX_FAILED_JOBS_IN_A_ROW_BEFORE_CONNECTION_DISABLE = "MAX_FAILED_JOBS_IN_A_ROW_BEFORE_CONNECTION_DISABLE";
private static final String MAX_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_CONNECTION_DISABLE = "MAX_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_CONNECTION_DISABLE";

// job-type-specific overrides
public static final String SPEC_JOB_KUBE_NODE_SELECTORS = "SPEC_JOB_KUBE_NODE_SELECTORS";
public static final String CHECK_JOB_KUBE_NODE_SELECTORS = "CHECK_JOB_KUBE_NODE_SELECTORS";
Expand Down Expand Up @@ -178,6 +181,9 @@ public class EnvConfigs implements Configs {

public static final int DEFAULT_TEMPORAL_HISTORY_RETENTION_IN_DAYS = 30;

public static final int DEFAULT_FAILED_JOBS_IN_A_ROW_BEFORE_CONNECTION_DISABLE = 100;
public static final int DEFAULT_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_CONNECTION_DISABLE = 14;

private final Function<String, String> getEnv;
private final Supplier<Set<String>> getAllEnvKeys;
private final LogConfigs logConfigs;
Expand Down Expand Up @@ -661,6 +667,16 @@ public Map<String, String> getJobDefaultEnvMap() {
return MoreMaps.merge(jobPrefixedEnvMap, jobSharedEnvMap);
}

@Override
public int getMaxFailedJobsInARowBeforeConnectionDisable() {
return getEnvOrDefault(MAX_FAILED_JOBS_IN_A_ROW_BEFORE_CONNECTION_DISABLE, DEFAULT_FAILED_JOBS_IN_A_ROW_BEFORE_CONNECTION_DISABLE);
}

@Override
public int getMaxDaysOfOnlyFailedJobsBeforeConnectionDisable() {
return getEnvOrDefault(MAX_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_CONNECTION_DISABLE, DEFAULT_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_CONNECTION_DISABLE);
}

@Override
public String getCheckJobMainContainerCpuRequest() {
return getEnvOrDefault(CHECK_JOB_MAIN_CONTAINER_CPU_REQUEST, getJobMainContainerCpuRequest());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_CATALOG_FETCH_EVENT;
import static io.airbyte.db.instance.configs.jooq.Tables.CONNECTION;
import static io.airbyte.db.instance.configs.jooq.Tables.CONNECTION_OPERATION;
import static io.airbyte.db.instance.configs.jooq.Tables.WORKSPACE;
import static org.jooq.impl.DSL.asterisk;

import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -88,13 +89,22 @@ public StandardWorkspace getStandardWorkspace(final UUID workspaceId, final bool
}

public Optional<StandardWorkspace> getWorkspaceBySlugOptional(final String slug, final boolean includeTombstone)
throws JsonValidationException, IOException {
for (final StandardWorkspace workspace : listStandardWorkspaces(includeTombstone)) {
if (workspace.getSlug().equals(slug)) {
return Optional.of(workspace);
}
throws IOException {
final Result<Record> result;
if (includeTombstone) {
result = database.query(ctx -> ctx.select(WORKSPACE.asterisk())
.from(WORKSPACE)
.where(WORKSPACE.SLUG.eq(slug))).fetch();
} else {
result = database.query(ctx -> ctx.select(WORKSPACE.asterisk())
.from(WORKSPACE)
.where(WORKSPACE.SLUG.eq(slug)).andNot(WORKSPACE.TOMBSTONE)).fetch();
}
return Optional.empty();

if (result.size() == 0) {
return Optional.empty();
}
return Optional.of(DbConverter.buildStandardWorkspace(result.get(0)));
}

public StandardWorkspace getWorkspaceBySlug(final String slug, final boolean includeTombstone)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import io.airbyte.config.ConfigWithMetadata;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.DestinationOAuthParameter;
import io.airbyte.config.Notification;
import io.airbyte.config.OperatorDbt;
import io.airbyte.config.OperatorNormalization;
import io.airbyte.config.SourceConnection;
Expand Down Expand Up @@ -317,12 +316,7 @@ private List<ConfigWithMetadata<StandardWorkspace>> listStandardWorkspaceWithMet

final List<ConfigWithMetadata<StandardWorkspace>> standardWorkspaces = new ArrayList<>();
for (final Record record : result) {
final List<Notification> notificationList = new ArrayList<>();
final List fetchedNotifications = Jsons.deserialize(record.get(WORKSPACE.NOTIFICATIONS).data(), List.class);
for (final Object notification : fetchedNotifications) {
notificationList.add(Jsons.convertValue(notification, Notification.class));
}
final StandardWorkspace workspace = buildStandardWorkspace(record, notificationList);
final StandardWorkspace workspace = DbConverter.buildStandardWorkspace(record);
standardWorkspaces.add(new ConfigWithMetadata<>(
record.get(WORKSPACE.ID).toString(),
ConfigSchema.STANDARD_WORKSPACE.name(),
Expand All @@ -333,24 +327,6 @@ private List<ConfigWithMetadata<StandardWorkspace>> listStandardWorkspaceWithMet
return standardWorkspaces;
}

private StandardWorkspace buildStandardWorkspace(final Record record, final List<Notification> notificationList) {
return new StandardWorkspace()
.withWorkspaceId(record.get(WORKSPACE.ID))
.withName(record.get(WORKSPACE.NAME))
.withSlug(record.get(WORKSPACE.SLUG))
.withInitialSetupComplete(record.get(WORKSPACE.INITIAL_SETUP_COMPLETE))
.withCustomerId(record.get(WORKSPACE.CUSTOMER_ID))
.withEmail(record.get(WORKSPACE.EMAIL))
.withAnonymousDataCollection(record.get(WORKSPACE.ANONYMOUS_DATA_COLLECTION))
.withNews(record.get(WORKSPACE.SEND_NEWSLETTER))
.withSecurityUpdates(record.get(WORKSPACE.SEND_SECURITY_UPDATES))
.withDisplaySetupWizard(record.get(WORKSPACE.DISPLAY_SETUP_WIZARD))
.withTombstone(record.get(WORKSPACE.TOMBSTONE))
.withNotifications(notificationList)
.withFirstCompletedSync(record.get(WORKSPACE.FIRST_SYNC_COMPLETE))
.withFeedbackDone(record.get(WORKSPACE.FEEDBACK_COMPLETE));
}

private List<ConfigWithMetadata<StandardSourceDefinition>> listStandardSourceDefinitionWithMetadata() throws IOException {
return listStandardSourceDefinitionWithMetadata(Optional.empty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@
package io.airbyte.config.persistence;

import static io.airbyte.db.instance.configs.jooq.Tables.CONNECTION;
import static io.airbyte.db.instance.configs.jooq.Tables.WORKSPACE;

import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType;
import io.airbyte.config.Notification;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.Schedule;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSync.Status;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.jooq.Record;
Expand Down Expand Up @@ -42,4 +46,27 @@ public static StandardSync buildStandardSync(final Record record, final List<UUI
.withResourceRequirements(Jsons.deserialize(record.get(CONNECTION.RESOURCE_REQUIREMENTS).data(), ResourceRequirements.class));
}

public static StandardWorkspace buildStandardWorkspace(final Record record) {
final List<Notification> notificationList = new ArrayList<>();
final List fetchedNotifications = Jsons.deserialize(record.get(WORKSPACE.NOTIFICATIONS).data(), List.class);
for (final Object notification : fetchedNotifications) {
notificationList.add(Jsons.convertValue(notification, Notification.class));
}
return new StandardWorkspace()
.withWorkspaceId(record.get(WORKSPACE.ID))
.withName(record.get(WORKSPACE.NAME))
.withSlug(record.get(WORKSPACE.SLUG))
.withInitialSetupComplete(record.get(WORKSPACE.INITIAL_SETUP_COMPLETE))
.withCustomerId(record.get(WORKSPACE.CUSTOMER_ID))
.withEmail(record.get(WORKSPACE.EMAIL))
.withAnonymousDataCollection(record.get(WORKSPACE.ANONYMOUS_DATA_COLLECTION))
.withNews(record.get(WORKSPACE.SEND_NEWSLETTER))
.withSecurityUpdates(record.get(WORKSPACE.SEND_SECURITY_UPDATES))
.withDisplaySetupWizard(record.get(WORKSPACE.DISPLAY_SETUP_WIZARD))
.withTombstone(record.get(WORKSPACE.TOMBSTONE))
.withNotifications(notificationList)
.withFirstCompletedSync(record.get(WORKSPACE.FIRST_SYNC_COMPLETE))
.withFeedbackDone(record.get(WORKSPACE.FEEDBACK_COMPLETE));
}

}
Loading

0 comments on commit b4d789c

Please sign in to comment.