Skip to content

Commit

Permalink
Revert "Revert "Bmoric/api flag not running auto propagation"" (#6562)
Browse files Browse the repository at this point in the history
  • Loading branch information
benmoriceau committed May 16, 2023
1 parent 4222b85 commit a2745ab
Show file tree
Hide file tree
Showing 9 changed files with 348 additions and 87 deletions.
48 changes: 45 additions & 3 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -954,7 +954,27 @@ paths:
$ref: "#/components/responses/NotFoundResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"

/v1/sources/apply_schema_changes:
post:
tags:
- source
summary:
Auto propagate the change on a catalog to a catalog saved in the DB. It will fetch all the connections linked to
a source id and apply the provided diff to their catalog.
operationId: applySchemaChangeForSource
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/SourceAutoPropagateChange"
required: true
responses:
"204":
description: The schema was properly auto propagate
"404":
$ref: "#/components/responses/NotFoundResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/sources/write_discover_catalog_result:
post:
tags:
Expand Down Expand Up @@ -3437,6 +3457,28 @@ components:
type: boolean
connectionStatus:
$ref: "#/components/schemas/ConnectionStatus"
SourceAutoPropagateChange:
description:
Input of the source propagation, it contains the discovered catalog and a list of diff that need to be applied
to the existing catalog.
type: object
required:
- catalog
- catalogId
- sourceId
- workspaceId
properties:
catalog:
$ref: "#/components/schemas/AirbyteCatalog"
catalogId:
type: string
format: uuid
sourceId:
type: string
format: uuid
workspaceId:
type: string
format: uuid
SourceSearch:
type: object
properties:
Expand Down Expand Up @@ -5575,8 +5617,8 @@ components:
$ref: "#/components/schemas/NonBreakingChangesPreference"
NonBreakingChangesPreference:
enum:
- ignore
- disable
- ignore # do nothing if we detect a schema change
- disable # disable the connection, pausing the sync
type: string
WebBackendConnectionReadList:
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.airbyte.api.model.generated.JobInfoRead;
import io.airbyte.api.model.generated.LogRead;
import io.airbyte.api.model.generated.NonBreakingChangesPreference;
import io.airbyte.api.model.generated.SourceAutoPropagateChange;
import io.airbyte.api.model.generated.SourceCoreConfig;
import io.airbyte.api.model.generated.SourceDefinitionIdWithWorkspaceId;
import io.airbyte.api.model.generated.SourceDefinitionSpecificationRead;
Expand Down Expand Up @@ -346,6 +347,53 @@ public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceId(final Source
.catalogId(currentCatalog.get().getId());
}

public void applySchemaChangeForSource(final SourceAutoPropagateChange sourceAutoPropagateChange)
throws IOException, JsonValidationException, ConfigNotFoundException {
if (sourceAutoPropagateChange.getSourceId() == null) {
return;
}

if (sourceAutoPropagateChange.getWorkspaceId() == null
|| sourceAutoPropagateChange.getCatalogId() == null
|| sourceAutoPropagateChange.getCatalog() == null) {
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.MISSING_APPLY_SCHEMA_CHANGE_INPUT, 1,
new MetricAttribute(MetricTags.SOURCE_ID, sourceAutoPropagateChange.getSourceId().toString()));
return;
}

final boolean autoPropagationIsEnabledForWorkspace =
featureFlagClient.boolVariation(AutoPropagateSchema.INSTANCE, new Workspace(sourceAutoPropagateChange.getWorkspaceId()));
if (!autoPropagationIsEnabledForWorkspace) {
return;
}

final ConnectionReadList connectionsForSource =
connectionsHandler.listConnectionsForSource(sourceAutoPropagateChange.getSourceId(), false);
for (final ConnectionRead connectionRead : connectionsForSource.getConnections()) {
final Optional<io.airbyte.api.model.generated.AirbyteCatalog> catalogUsedToMakeConfiguredCatalog = connectionsHandler
.getConnectionAirbyteCatalog(connectionRead.getConnectionId());
final io.airbyte.api.model.generated.@NotNull AirbyteCatalog currentAirbyteCatalog =
connectionRead.getSyncCatalog();
final CatalogDiff diff =
connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog),
sourceAutoPropagateChange.getCatalog(),
CatalogConverter.toConfiguredProtocol(currentAirbyteCatalog));

final ConnectionUpdate updateObject =
new ConnectionUpdate().connectionId(connectionRead.getConnectionId());

if (shouldAutoPropagate(diff, sourceAutoPropagateChange.getWorkspaceId(), connectionRead)) {
applySchemaChange(sourceAutoPropagateChange.getWorkspaceId(),
updateObject,
currentAirbyteCatalog,
sourceAutoPropagateChange.getCatalog(),
diff.getTransforms(),
sourceAutoPropagateChange.getCatalogId());
}
connectionsHandler.updateConnection(updateObject);
}
}

public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceCreate(final SourceCoreConfig sourceCreate)
throws ConfigNotFoundException, IOException, JsonValidationException {
final StandardSourceDefinition sourceDef = configRepository.getStandardSourceDefinition(sourceCreate.getSourceDefinitionId());
Expand Down Expand Up @@ -444,7 +492,8 @@ public DestinationDefinitionSpecificationRead getSpecificationForDestinationId(f
}

@SuppressWarnings("LineLength")
public DestinationDefinitionSpecificationRead getDestinationSpecification(final DestinationDefinitionIdWithWorkspaceId destinationDefinitionIdWithWorkspaceId)
public DestinationDefinitionSpecificationRead getDestinationSpecification(
final DestinationDefinitionIdWithWorkspaceId destinationDefinitionIdWithWorkspaceId)
throws ConfigNotFoundException, IOException, JsonValidationException {
final UUID destinationDefinitionId = destinationDefinitionIdWithWorkspaceId.getDestinationDefinitionId();
final StandardDestinationDefinition destination = configRepository.getStandardDestinationDefinition(destinationDefinitionId);
Expand Down Expand Up @@ -522,6 +571,7 @@ private void generateCatalogDiffsAndDisableConnectionsIfNeeded(final SourceDisco

final ConnectionUpdate updateObject =
new ConnectionUpdate().breakingChange(containsBreakingChange).connectionId(connectionRead.getConnectionId());

final ConnectionStatus connectionStatus;
if (shouldDisableConnection(containsBreakingChange, connectionRead.getNonBreakingChangesPreference(), diff)) {
connectionStatus = ConnectionStatus.INACTIVE;
Expand All @@ -530,16 +580,6 @@ private void generateCatalogDiffsAndDisableConnectionsIfNeeded(final SourceDisco
}
updateObject.status(connectionStatus);

if (!diff.getTransforms().isEmpty() && !containsBreakingChange) {
autoPropagateSchemaChange(workspaceId,
connectionRead.getConnectionId(),
updateObject,
currentAirbyteCatalog,
discoveredSchema.getCatalog(),
diff.getTransforms(),
discoveredSchema.getCatalogId());
}

connectionsHandler.updateConnection(updateObject);

if (shouldNotifySchemaChange(diff, connectionRead, discoverSchemaRequestBody)) {
Expand All @@ -552,24 +592,29 @@ private void generateCatalogDiffsAndDisableConnectionsIfNeeded(final SourceDisco
}
}

@VisibleForTesting
void autoPropagateSchemaChange(final UUID workspaceId,
final UUID connectionId,
private boolean shouldAutoPropagate(final CatalogDiff diff, final UUID workspaceId, final ConnectionRead connectionRead) {
final boolean hasDiff = !diff.getTransforms().isEmpty();
final boolean nonBreakingChange = !containsBreakingChange(diff);
final boolean autoPropagationIsEnabledForWorkspace = featureFlagClient.boolVariation(AutoPropagateSchema.INSTANCE, new Workspace(workspaceId));
final boolean autoPropagationIsEnabledForConnection =
connectionRead.getNonBreakingChangesPreference() != null;
return hasDiff && nonBreakingChange && autoPropagationIsEnabledForWorkspace && autoPropagationIsEnabledForConnection;
}

private void applySchemaChange(final UUID connectionId,
final ConnectionUpdate updateObject,
final io.airbyte.api.model.generated.AirbyteCatalog currentAirbyteCatalog,
final io.airbyte.api.model.generated.AirbyteCatalog newCatalog,
final List<StreamTransform> transformations,
final UUID sourceCatalogId) {
if (featureFlagClient.boolVariation(AutoPropagateSchema.INSTANCE, new Workspace(workspaceId))) {
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.SCHEMA_CHANGE_AUTO_PROPAGATED, 1,
new MetricAttribute(MetricTags.CONNECTION_ID, connectionId.toString()));
io.airbyte.api.model.generated.AirbyteCatalog catalog = getUpdatedSchema(
currentAirbyteCatalog,
newCatalog,
transformations);
updateObject.setSyncCatalog(catalog);
updateObject.setSourceCatalogId(sourceCatalogId);
}
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.SCHEMA_CHANGE_AUTO_PROPAGATED, 1,
new MetricAttribute(MetricTags.CONNECTION_ID, connectionId.toString()));
final io.airbyte.api.model.generated.AirbyteCatalog catalog = getUpdatedSchema(
currentAirbyteCatalog,
newCatalog,
transformations);
updateObject.setSyncCatalog(catalog);
updateObject.setSourceCatalogId(sourceCatalogId);
}

private boolean shouldNotifySchemaChange(final CatalogDiff diff,
Expand Down
Loading

0 comments on commit a2745ab

Please sign in to comment.