Skip to content

Commit

Permalink
Do not attempt to update streams that are not in the sync catalog (#7…
Browse files Browse the repository at this point in the history
…284)
  • Loading branch information
malikdiarra committed Jun 15, 2023
1 parent 5a61df2 commit 9006ece
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -375,20 +375,20 @@ public void applySchemaChangeForSource(final SourceAutoPropagateChange sourceAut
for (final ConnectionRead connectionRead : connectionsForSource.getConnections()) {
final Optional<io.airbyte.api.model.generated.AirbyteCatalog> catalogUsedToMakeConfiguredCatalog = connectionsHandler
.getConnectionAirbyteCatalog(connectionRead.getConnectionId());
final io.airbyte.api.model.generated.@NotNull AirbyteCatalog currentAirbyteCatalog =
final io.airbyte.api.model.generated.@NotNull AirbyteCatalog syncCatalog =
connectionRead.getSyncCatalog();
final CatalogDiff diff =
connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog),
connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(syncCatalog),
sourceAutoPropagateChange.getCatalog(),
CatalogConverter.toConfiguredProtocol(currentAirbyteCatalog));
CatalogConverter.toConfiguredProtocol(syncCatalog));

final ConnectionUpdate updateObject =
new ConnectionUpdate().connectionId(connectionRead.getConnectionId());

if (shouldAutoPropagate(diff, sourceAutoPropagateChange.getWorkspaceId(), connectionRead)) {
applySchemaChange(sourceAutoPropagateChange.getWorkspaceId(),
updateObject,
currentAirbyteCatalog,
syncCatalog,
sourceAutoPropagateChange.getCatalog(),
diff.getTransforms(),
sourceAutoPropagateChange.getCatalogId(),
Expand Down Expand Up @@ -611,15 +611,15 @@ private boolean shouldAutoPropagate(final CatalogDiff diff, final UUID workspace

private void applySchemaChange(final UUID connectionId,
final ConnectionUpdate updateObject,
final io.airbyte.api.model.generated.AirbyteCatalog currentAirbyteCatalog,
final io.airbyte.api.model.generated.AirbyteCatalog currentSyncCatalog,
final io.airbyte.api.model.generated.AirbyteCatalog newCatalog,
final List<StreamTransform> transformations,
final UUID sourceCatalogId,
final NonBreakingChangesPreference nonBreakingChangesPreference) {
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.SCHEMA_CHANGE_AUTO_PROPAGATED, 1,
new MetricAttribute(MetricTags.CONNECTION_ID, connectionId.toString()));
final io.airbyte.api.model.generated.AirbyteCatalog catalog = getUpdatedSchema(
currentAirbyteCatalog,
currentSyncCatalog,
newCatalog,
transformations,
nonBreakingChangesPreference);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,10 @@ public static AirbyteCatalog getUpdatedSchema(final AirbyteCatalog oldCatalog,
final StreamDescriptor streamDescriptor = transformation.getStreamDescriptor();
switch (transformation.getTransformType()) {
case UPDATE_STREAM -> {
if (!oldCatalogPerStream.containsKey(streamDescriptor)) {
LOGGER.error("Attempting to update a stream that does not exist in the old catalog: {}", streamDescriptor);
if (oldCatalogPerStream.containsKey(streamDescriptor)) {
oldCatalogPerStream.get(streamDescriptor)
.stream(newCatalogPerStream.get(streamDescriptor).getStream());
}
oldCatalogPerStream.get(streamDescriptor)
.stream(newCatalogPerStream.get(streamDescriptor).getStream());
}
case ADD_STREAM -> {
if (nonBreakingChangesPreference.equals(NonBreakingChangesPreference.PROPAGATE_FULLY)) {
Expand Down

0 comments on commit 9006ece

Please sign in to comment.