Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update web backend get connection to properly handle field selection and schema refresh #20323

Merged
merged 4 commits into from
Dec 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,14 @@
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AllArgsConstructor
@Slf4j
public class WebBackendConnectionsHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(WebBackendConnectionsHandler.class);
private final ConnectionsHandler connectionsHandler;
private final StateHandler stateHandler;
private final SourceHandler sourceHandler;
Expand Down Expand Up @@ -343,7 +346,8 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti
* constructs a full picture of all existing configured + all new / updated streams in the newest
* catalog.
*/
syncCatalog = updateSchemaWithDiscovery(configuredCatalog, refreshedCatalog.get().getCatalog());
syncCatalog = updateSchemaWithRefreshedDiscoveredCatalog(configuredCatalog, catalogUsedToMakeConfiguredCatalog.get(),
refreshedCatalog.get().getCatalog());
/*
* Diffing the catalog used to make the configured catalog gives us the clearest diff between the
* schema when the configured catalog was made and now. In the case where we do not have the
Expand All @@ -357,7 +361,7 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti
connection.setStatus(refreshedCatalog.get().getConnectionStatus());
} else if (catalogUsedToMakeConfiguredCatalog.isPresent()) {
// reconstructs a full picture of the full schema at the time the catalog was configured.
syncCatalog = updateSchemaWithDiscovery(configuredCatalog, catalogUsedToMakeConfiguredCatalog.get());
syncCatalog = updateSchemaWithOriginalDiscoveredCatalog(configuredCatalog, catalogUsedToMakeConfiguredCatalog.get());
// diff not relevant if there was no refresh.
diff = null;
} else {
Expand All @@ -371,6 +375,11 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti
return buildWebBackendConnectionRead(connection, currentSourceCatalogId).catalogDiff(diff);
}

private AirbyteCatalog updateSchemaWithOriginalDiscoveredCatalog(AirbyteCatalog configuredCatalog, AirbyteCatalog originalDiscoveredCatalog) {
// We pass the original discovered catalog in as the "new" discovered catalog.
return updateSchemaWithRefreshedDiscoveredCatalog(configuredCatalog, originalDiscoveredCatalog, originalDiscoveredCatalog);
}

private Optional<SourceDiscoverSchemaRead> getRefreshedSchema(final UUID sourceId, final UUID connectionId)
throws JsonValidationException, ConfigNotFoundException, IOException {
final SourceDiscoverSchemaRequestBody discoverSchemaReadReq = new SourceDiscoverSchemaRequestBody()
Expand All @@ -386,30 +395,40 @@ private Optional<SourceDiscoverSchemaRead> getRefreshedSchema(final UUID sourceI
* is in the old and new catalog, any configuration that was previously set for users, we add to the
* new catalog.
*
* @param original fully configured, original catalog
* @param originalConfigured fully configured, original catalog
* @param originalDiscovered the original discovered catalog used to make the original configured
* catalog
* @param discovered newly discovered catalog, no configurations set
* @return merged catalog, most up-to-date schema with most up-to-date configurations from old
* catalog
*/
@VisibleForTesting
protected static AirbyteCatalog updateSchemaWithDiscovery(final AirbyteCatalog original, final AirbyteCatalog discovered) {
protected static AirbyteCatalog updateSchemaWithRefreshedDiscoveredCatalog(final AirbyteCatalog originalConfigured,
AirbyteCatalog originalDiscovered,
final AirbyteCatalog discovered) {
/*
* We can't directly use s.getStream() as the key, because it contains a bunch of other fields, so
* we just define a quick-and-dirty record class.
*/
final Map<Stream, AirbyteStreamAndConfiguration> streamDescriptorToOriginalStream = original.getStreams()
final Map<Stream, AirbyteStreamAndConfiguration> streamDescriptorToOriginalStream = originalConfigured.getStreams()
.stream()
.collect(toMap(s -> new Stream(s.getStream().getName(), s.getStream().getNamespace()), s -> s));
final Map<Stream, AirbyteStreamAndConfiguration> streamDescriptorToOriginalDiscoveredStream = originalDiscovered.getStreams()
.stream()
.collect(toMap(s -> new Stream(s.getStream().getName(), s.getStream().getNamespace()), s -> s));

final List<AirbyteStreamAndConfiguration> streams = new ArrayList<>();

for (final AirbyteStreamAndConfiguration discoveredStream : discovered.getStreams()) {
final AirbyteStream stream = discoveredStream.getStream();
final AirbyteStreamAndConfiguration originalStream = streamDescriptorToOriginalStream.get(new Stream(stream.getName(), stream.getNamespace()));
final AirbyteStreamAndConfiguration originalConfiguredStream = streamDescriptorToOriginalStream.get(
new Stream(stream.getName(), stream.getNamespace()));
final AirbyteStreamAndConfiguration originalDiscoveredStream = streamDescriptorToOriginalDiscoveredStream.get(
new Stream(stream.getName(), stream.getNamespace()));
final AirbyteStreamConfiguration outputStreamConfig;

if (originalStream != null) {
final AirbyteStreamConfiguration originalStreamConfig = originalStream.getConfig();
if (originalConfiguredStream != null) {
final AirbyteStreamConfiguration originalStreamConfig = originalConfiguredStream.getConfig();
final AirbyteStreamConfiguration discoveredStreamConfig = discoveredStream.getConfig();
outputStreamConfig = new AirbyteStreamConfiguration();

Expand All @@ -433,16 +452,29 @@ protected static AirbyteCatalog updateSchemaWithDiscovery(final AirbyteCatalog o
}

outputStreamConfig.setAliasName(originalStreamConfig.getAliasName());
outputStreamConfig.setSelected(originalStream.getConfig().getSelected());
outputStreamConfig.setSelected(originalConfiguredStream.getConfig().getSelected());

outputStreamConfig.setFieldSelectionEnabled(originalStreamConfig.getFieldSelectionEnabled());
if (outputStreamConfig.getFieldSelectionEnabled()) {
// TODO(mfsiega-airbyte): support nested fields.
// If field selection is enabled, populate the selected columns.
final List<String> selectedFields = new ArrayList<>();
originalStream.getStream().getJsonSchema().findValue("properties").fieldNames().forEachRemaining((name) -> selectedFields.add(name));
outputStreamConfig.setSelectedFields(
selectedFields.stream().map((fieldName) -> new SelectedFieldInfo().addFieldPathItem(fieldName)).collect(Collectors.toList()));
// If field selection is enabled, populate the selected fields.
final Set<String> originallyDiscovered = new HashSet<>();
final Set<String> refreshDiscovered = new HashSet<>();
// NOTE: by only taking the first element of the path, we're restricting to top-level fields.
final Set<String> originallySelected = new HashSet<>(
originalConfiguredStream.getConfig().getSelectedFields().stream().map((field) -> field.getFieldPath().get(0)).toList());
originalDiscoveredStream.getStream().getJsonSchema().findPath("properties").fieldNames()
.forEachRemaining((name) -> originallyDiscovered.add(name));
stream.getJsonSchema().findPath("properties").fieldNames().forEachRemaining((name) -> refreshDiscovered.add(name));
// We include a selected field if it:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great comments, would be hard to follow without them!

// (is in the newly discovered schema) AND (it was either originally selected OR not in the
// originally discovered schema at all)
// NOTE: this implies that the default behaviour for newly-discovered columns is to add them.
for (final String discoveredField : refreshDiscovered) {
if (originallySelected.contains(discoveredField) || !originallyDiscovered.contains(discoveredField)) {
outputStreamConfig.addSelectedFieldsItem(new SelectedFieldInfo().addFieldPathItem(discoveredField));
}
}
}

} else {
Expand Down Expand Up @@ -503,7 +535,9 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne
.getConnectionAirbyteCatalog(connectionId);
if (catalogUsedToMakeConfiguredCatalog.isPresent()) {
// Update the Catalog returned to include all streams, including disabled ones
final AirbyteCatalog syncCatalog = updateSchemaWithDiscovery(updatedConnectionRead.getSyncCatalog(), catalogUsedToMakeConfiguredCatalog.get());
final AirbyteCatalog syncCatalog =
updateSchemaWithRefreshedDiscoveredCatalog(updatedConnectionRead.getSyncCatalog(), catalogUsedToMakeConfiguredCatalog.get(),
catalogUsedToMakeConfiguredCatalog.get());
updatedConnectionRead.setSyncCatalog(syncCatalog);
}

Expand Down
Loading