diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java index 592f56518c62..21af162630d3 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java @@ -346,7 +346,8 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti * constructs a full picture of all existing configured + all new / updated streams in the newest * catalog. */ - syncCatalog = updateSchemaWithDiscovery(configuredCatalog, catalogUsedToMakeConfiguredCatalog.get(), refreshedCatalog.get().getCatalog()); + syncCatalog = updateSchemaWithRefreshedDiscoveredCatalog(configuredCatalog, catalogUsedToMakeConfiguredCatalog.get(), + refreshedCatalog.get().getCatalog()); /* * Diffing the catalog used to make the configured catalog gives us the clearest diff between the * schema when the configured catalog was made and now. In the case where we do not have the @@ -360,7 +361,7 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti connection.setStatus(refreshedCatalog.get().getConnectionStatus()); } else if (catalogUsedToMakeConfiguredCatalog.isPresent()) { // reconstructs a full picture of the full schema at the time the catalog was configured. - syncCatalog = updateSchemaWithDiscovery(configuredCatalog, catalogUsedToMakeConfiguredCatalog.get(), catalogUsedToMakeConfiguredCatalog.get()); + syncCatalog = updateSchemaWithOriginalDiscoveredCatalog(configuredCatalog, catalogUsedToMakeConfiguredCatalog.get()); // diff not relevant if there was no refresh. diff = null; } else { @@ -374,6 +375,11 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti return buildWebBackendConnectionRead(connection, currentSourceCatalogId).catalogDiff(diff); } + private AirbyteCatalog updateSchemaWithOriginalDiscoveredCatalog(AirbyteCatalog configuredCatalog, AirbyteCatalog originalDiscoveredCatalog) { + // We pass the original discovered catalog in as the "new" discovered catalog. + return updateSchemaWithRefreshedDiscoveredCatalog(configuredCatalog, originalDiscoveredCatalog, originalDiscoveredCatalog); + } + private Optional getRefreshedSchema(final UUID sourceId, final UUID connectionId) throws JsonValidationException, ConfigNotFoundException, IOException { final SourceDiscoverSchemaRequestBody discoverSchemaReadReq = new SourceDiscoverSchemaRequestBody() @@ -397,9 +403,9 @@ private Optional getRefreshedSchema(final UUID sourceI * catalog */ @VisibleForTesting - protected static AirbyteCatalog updateSchemaWithDiscovery(final AirbyteCatalog originalConfigured, - AirbyteCatalog originalDiscovered, - final AirbyteCatalog discovered) { + protected static AirbyteCatalog updateSchemaWithRefreshedDiscoveredCatalog(final AirbyteCatalog originalConfigured, + AirbyteCatalog originalDiscovered, + final AirbyteCatalog discovered) { /* * We can't directly use s.getStream() as the key, because it contains a bunch of other fields, so * we just define a quick-and-dirty record class. @@ -529,8 +535,9 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne .getConnectionAirbyteCatalog(connectionId); if (catalogUsedToMakeConfiguredCatalog.isPresent()) { // Update the Catalog returned to include all streams, including disabled ones - final AirbyteCatalog syncCatalog = updateSchemaWithDiscovery(updatedConnectionRead.getSyncCatalog(), catalogUsedToMakeConfiguredCatalog.get(), - catalogUsedToMakeConfiguredCatalog.get()); + final AirbyteCatalog syncCatalog = + updateSchemaWithRefreshedDiscoveredCatalog(updatedConnectionRead.getSyncCatalog(), catalogUsedToMakeConfiguredCatalog.get(), + catalogUsedToMakeConfiguredCatalog.get()); updatedConnectionRead.setSyncCatalog(syncCatalog); } diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java index 3c88aababd53..94935832373d 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java @@ -759,7 +759,8 @@ void testUpdateConnection() throws JsonValidationException, ConfigNotFoundExcept when(connectionsHandler.getConnectionAirbyteCatalog(connectionRead.getConnectionId())).thenReturn(Optional.ofNullable(fullAirbyteCatalog)); final AirbyteCatalog expectedCatalogReturned = - WebBackendConnectionsHandler.updateSchemaWithDiscovery(expected.getSyncCatalog(), expected.getSyncCatalog(), fullAirbyteCatalog); + WebBackendConnectionsHandler.updateSchemaWithRefreshedDiscoveredCatalog(expected.getSyncCatalog(), expected.getSyncCatalog(), + fullAirbyteCatalog); final WebBackendConnectionRead connectionRead = wbHandler.webBackendUpdateConnection(updateBody); assertEquals(expectedCatalogReturned, connectionRead.getSyncCatalog()); @@ -1076,7 +1077,7 @@ void testUpdateSchemaWithDiscoveryFromEmpty() { .aliasName(STREAM1) .setSelected(false); - final AirbyteCatalog actual = WebBackendConnectionsHandler.updateSchemaWithDiscovery(original, original, discovered); + final AirbyteCatalog actual = WebBackendConnectionsHandler.updateSchemaWithRefreshedDiscoveredCatalog(original, original, discovered); assertEquals(expected, actual); } @@ -1126,7 +1127,7 @@ void testUpdateSchemaWithDiscoveryResetStream() { .aliasName(STREAM1) .setSelected(false); - final AirbyteCatalog actual = WebBackendConnectionsHandler.updateSchemaWithDiscovery(original, original, discovered); + final AirbyteCatalog actual = WebBackendConnectionsHandler.updateSchemaWithRefreshedDiscoveredCatalog(original, original, discovered); assertEquals(expected, actual); } @@ -1203,7 +1204,7 @@ void testUpdateSchemaWithDiscoveryMergeNewStream() { .setSelected(false); expected.getStreams().add(expectedNewStream); - final AirbyteCatalog actual = WebBackendConnectionsHandler.updateSchemaWithDiscovery(original, original, discovered); + final AirbyteCatalog actual = WebBackendConnectionsHandler.updateSchemaWithRefreshedDiscoveredCatalog(original, original, discovered); assertEquals(expected, actual); } @@ -1251,7 +1252,7 @@ void testUpdateSchemaWithNamespacedStreams() { .aliasName(STREAM1) .setSelected(false); - final AirbyteCatalog actual = WebBackendConnectionsHandler.updateSchemaWithDiscovery(original, original, discovered); + final AirbyteCatalog actual = WebBackendConnectionsHandler.updateSchemaWithRefreshedDiscoveredCatalog(original, original, discovered); assertEquals(expected, actual); }