Skip to content

Commit

Permalink
make schema autopropagation configurable through the api (#6357)
Browse files Browse the repository at this point in the history
Co-authored-by: benmoriceau <benoit@airbyte.io>
  • Loading branch information
mfsiega-airbyte and benmoriceau committed May 23, 2023
1 parent 29e8d13 commit 46a2ce5
Show file tree
Hide file tree
Showing 6 changed files with 290 additions and 72 deletions.
2 changes: 2 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5781,6 +5781,8 @@ components:
enum:
- ignore # do nothing if we detect a schema change
- disable # disable the connection, pausing the sync
- propagate_columns # automatically propagate the changes on selected streams and continue syncing
- propagate_fully # automatically propagate the changes including new streams and continue syncing
type: string
WebBackendConnectionReadList:
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,9 +388,10 @@ public void applySchemaChangeForSource(final SourceAutoPropagateChange sourceAut
currentAirbyteCatalog,
sourceAutoPropagateChange.getCatalog(),
diff.getTransforms(),
sourceAutoPropagateChange.getCatalogId());
sourceAutoPropagateChange.getCatalogId(),
connectionRead.getNonBreakingChangesPreference());
connectionsHandler.updateConnection(updateObject);
}
connectionsHandler.updateConnection(updateObject);
}
}

Expand Down Expand Up @@ -597,7 +598,9 @@ private boolean shouldAutoPropagate(final CatalogDiff diff, final UUID workspace
final boolean nonBreakingChange = !containsBreakingChange(diff);
final boolean autoPropagationIsEnabledForWorkspace = featureFlagClient.boolVariation(AutoPropagateSchema.INSTANCE, new Workspace(workspaceId));
final boolean autoPropagationIsEnabledForConnection =
connectionRead.getNonBreakingChangesPreference() != null;
connectionRead.getNonBreakingChangesPreference() != null
&& (connectionRead.getNonBreakingChangesPreference().equals(NonBreakingChangesPreference.PROPAGATE_COLUMNS)
|| connectionRead.getNonBreakingChangesPreference().equals(NonBreakingChangesPreference.PROPAGATE_FULLY));
return hasDiff && nonBreakingChange && autoPropagationIsEnabledForWorkspace && autoPropagationIsEnabledForConnection;
}

Expand All @@ -606,13 +609,15 @@ private void applySchemaChange(final UUID connectionId,
final io.airbyte.api.model.generated.AirbyteCatalog currentAirbyteCatalog,
final io.airbyte.api.model.generated.AirbyteCatalog newCatalog,
final List<StreamTransform> transformations,
final UUID sourceCatalogId) {
final UUID sourceCatalogId,
final NonBreakingChangesPreference nonBreakingChangesPreference) {
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.SCHEMA_CHANGE_AUTO_PROPAGATED, 1,
new MetricAttribute(MetricTags.CONNECTION_ID, connectionId.toString()));
final io.airbyte.api.model.generated.AirbyteCatalog catalog = getUpdatedSchema(
currentAirbyteCatalog,
newCatalog,
transformations);
transformations,
nonBreakingChangesPreference);
updateObject.setSyncCatalog(catalog);
updateObject.setSourceCatalogId(sourceCatalogId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.api.model.generated.AirbyteCatalog;
import io.airbyte.api.model.generated.AirbyteStreamAndConfiguration;
import io.airbyte.api.model.generated.NonBreakingChangesPreference;
import io.airbyte.api.model.generated.StreamDescriptor;
import io.airbyte.api.model.generated.StreamTransform;
import io.airbyte.commons.json.Jsons;
Expand All @@ -30,22 +31,32 @@ public class AutoPropagateSchemaChangeHelper {
* @param oldCatalog the currently saved catalog
* @param newCatalog the new catalog, which contains all the stream even the unselected ones
* @param transformations list of transformation per stream
* @param nonBreakingChangesPreference User preference for the auto propagation
* @return an Airbyte catalog the changes being auto propagated
*/
public static AirbyteCatalog getUpdatedSchema(final AirbyteCatalog oldCatalog,
final AirbyteCatalog newCatalog,
final List<StreamTransform> transformations) {
AirbyteCatalog copiedOldCatalog = Jsons.clone(oldCatalog);
Map<StreamDescriptor, AirbyteStreamAndConfiguration> oldCatalogPerStream = extractStreamAndConfigPerStreamDescriptor(copiedOldCatalog);
Map<StreamDescriptor, AirbyteStreamAndConfiguration> newCatalogPerStream = extractStreamAndConfigPerStreamDescriptor(newCatalog);
final List<StreamTransform> transformations,
final NonBreakingChangesPreference nonBreakingChangesPreference) {
final AirbyteCatalog copiedOldCatalog = Jsons.clone(oldCatalog);
final Map<StreamDescriptor, AirbyteStreamAndConfiguration> oldCatalogPerStream = extractStreamAndConfigPerStreamDescriptor(copiedOldCatalog);
final Map<StreamDescriptor, AirbyteStreamAndConfiguration> newCatalogPerStream = extractStreamAndConfigPerStreamDescriptor(newCatalog);

transformations.forEach(transformation -> {
StreamDescriptor streamDescriptor = transformation.getStreamDescriptor();
final StreamDescriptor streamDescriptor = transformation.getStreamDescriptor();
switch (transformation.getTransformType()) {
case UPDATE_STREAM -> oldCatalogPerStream.get(streamDescriptor)
.stream(newCatalogPerStream.get(streamDescriptor).getStream());
case ADD_STREAM -> oldCatalogPerStream.put(streamDescriptor, newCatalogPerStream.get(streamDescriptor));
case REMOVE_STREAM -> oldCatalogPerStream.remove(streamDescriptor);
case ADD_STREAM -> {
if (nonBreakingChangesPreference.equals(NonBreakingChangesPreference.PROPAGATE_FULLY)) {
oldCatalogPerStream.put(streamDescriptor, newCatalogPerStream.get(streamDescriptor));
}
}
case REMOVE_STREAM -> {
if (nonBreakingChangesPreference.equals(NonBreakingChangesPreference.PROPAGATE_FULLY)) {
oldCatalogPerStream.remove(streamDescriptor);
}
}
default -> throw new NotSupportedException("Not supported transformation.");
}
});
Expand All @@ -54,7 +65,7 @@ public static AirbyteCatalog getUpdatedSchema(final AirbyteCatalog oldCatalog,
}

@VisibleForTesting
static Map<StreamDescriptor, AirbyteStreamAndConfiguration> extractStreamAndConfigPerStreamDescriptor(AirbyteCatalog catalog) {
static Map<StreamDescriptor, AirbyteStreamAndConfiguration> extractStreamAndConfigPerStreamDescriptor(final AirbyteCatalog catalog) {
return catalog.getStreams().stream().collect(Collectors.toMap(
airbyteStreamAndConfiguration -> new StreamDescriptor().name(airbyteStreamAndConfiguration.getStream().getName())
.namespace(airbyteStreamAndConfiguration.getStream().getNamespace()),
Expand Down
Loading

0 comments on commit 46a2ce5

Please sign in to comment.