Skip to content

Commit

Permalink
revert: "feat: add internal api to support field hashing" (#13859)
Browse files Browse the repository at this point in the history
  • Loading branch information
benmoriceau committed Sep 6, 2024
1 parent bf799dd commit 6cca28f
Show file tree
Hide file tree
Showing 28 changed files with 83 additions and 617 deletions.
3 changes: 0 additions & 3 deletions airbyte-api/server-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10538,9 +10538,6 @@ components:
selectedFields:
description: This must be set if `fieldSelectedEnabled` is set. An empty list indicates that no properties will be included.
$ref: "#/components/schemas/SelectedFields"
hashedFields:
description: Fields that should be hashed before being written to the destination.
$ref: "#/components/schemas/SelectedFields"
minimumGenerationId:
type: integer
format: int64
Expand Down
1 change: 0 additions & 1 deletion airbyte-commons-converters/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ dependencies {
implementation(project(":oss:airbyte-commons"))
implementation(project(":oss:airbyte-config:config-models"))
implementation(project(":oss:airbyte-json-validation"))
implementation(project(":oss:airbyte-mappers"))
implementation(libs.airbyte.protocol)
implementation(libs.guava)
implementation(libs.slf4j.api)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,11 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.api.client.model.generated.AirbyteStreamConfiguration;
import io.airbyte.api.client.model.generated.DestinationSyncMode;
import io.airbyte.api.client.model.generated.SelectedFieldInfo;
import io.airbyte.api.client.model.generated.SyncMode;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.text.Names;
import io.airbyte.config.ConfiguredAirbyteStream;
import io.airbyte.config.ConfiguredMapper;
import io.airbyte.config.helpers.FieldGenerator;
import io.airbyte.mappers.helpers.MapperHelperKt;
import io.airbyte.validation.json.JsonValidationException;
import jakarta.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
Expand All @@ -32,9 +27,6 @@
*/
public class CatalogClientConverters {

// TODO(pedro): This should be refactored to use dependency injection.
private static final FieldGenerator fieldGenerator = new FieldGenerator();

/**
* Convert to api model to airbyte protocol model.
*
Expand Down Expand Up @@ -195,34 +187,18 @@ private static io.airbyte.config.AirbyteStream toStreamInternal(final io.airbyte
.withIsResumable(stream.isResumable());
}

private static List<ConfiguredMapper> toConfiguredHashingMappers(final @Nullable List<SelectedFieldInfo> hashedFields) {
if (hashedFields == null) {
return Collections.emptyList();
}

// FIXME(pedro): See https://github.com/airbytehq/airbyte-internal-issues/issues/9718
// We shouldn't have to rebuild these here, and can potentially lead to losing configuration that's
// actually stored in the db.
return hashedFields.stream().map(f -> MapperHelperKt.createHashingMapper(f.getFieldPath().getFirst()) // We don't support nested fields for now.
).toList();
}

private static ConfiguredAirbyteStream toConfiguredStreamInternal(final io.airbyte.api.client.model.generated.AirbyteStream stream,
final AirbyteStreamConfiguration config)
throws JsonValidationException {
final var convertedStream = toStreamInternal(stream, config);
return new ConfiguredAirbyteStream.Builder()
.stream(convertedStream)
.syncMode(Enums.convertTo(config.getSyncMode(), io.airbyte.config.SyncMode.class))
.destinationSyncMode(Enums.convertTo(config.getDestinationSyncMode(), io.airbyte.config.DestinationSyncMode.class))
.primaryKey(config.getPrimaryKey())
.cursorField(config.getCursorField())
.generationId(config.getGenerationId())
.minimumGenerationId(config.getMinimumGenerationId())
.syncId(config.getSyncId())
.fields(fieldGenerator.getFieldsFromSchema(convertedStream.getJsonSchema()))
.mappers(toConfiguredHashingMappers(config.getHashedFields()))
.build();
return new ConfiguredAirbyteStream(
toStreamInternal(stream, config),
Enums.convertTo(config.getSyncMode(), io.airbyte.config.SyncMode.class),
Enums.convertTo(config.getDestinationSyncMode(), io.airbyte.config.DestinationSyncMode.class))
.withPrimaryKey(config.getPrimaryKey())
.withCursorField(config.getCursorField())
.withGenerationId(config.getGenerationId())
.withMinimumGenerationId(config.getMinimumGenerationId())
.withSyncId(config.getSyncId());
}

/**
Expand Down Expand Up @@ -254,7 +230,6 @@ private static io.airbyte.api.client.model.generated.AirbyteStreamConfiguration
null,
null,
null,
null,
null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@

import com.google.common.collect.Lists;
import io.airbyte.api.client.model.generated.AirbyteStreamAndConfiguration;
import io.airbyte.api.client.model.generated.SelectedFieldInfo;
import io.airbyte.commons.text.Names;
import io.airbyte.config.ConfiguredAirbyteCatalog;
import io.airbyte.config.helpers.FieldGenerator;
import io.airbyte.mappers.helpers.MapperHelperKt;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
Expand All @@ -26,7 +22,6 @@

class CatalogClientConvertersTest {

private static final FieldGenerator fieldGenerator = new FieldGenerator();
public static final String ID_FIELD_NAME = "id";
private static final String STREAM_NAME = "users-data";
private static final AirbyteStream STREAM = new AirbyteStream()
Expand Down Expand Up @@ -62,7 +57,6 @@ class CatalogClientConvertersTest {
null,
null,
null,
null,
null);

private static final AirbyteCatalog BASIC_MODEL_CATALOG = new AirbyteCatalog().withStreams(
Expand All @@ -87,38 +81,6 @@ void testConvertToProtocol() {
CatalogClientConverters.toAirbyteProtocol(EXPECTED_CLIENT_CATALOG));
}

@Test
void testConvertInternalWithMapping() {
final var streamConfig = new io.airbyte.api.client.model.generated.AirbyteStreamConfiguration(
io.airbyte.api.client.model.generated.SyncMode.FULL_REFRESH,
io.airbyte.api.client.model.generated.DestinationSyncMode.APPEND,
List.of(ID_FIELD_NAME),
List.of(),
Names.toAlphanumericAndUnderscore(STREAM_NAME),
true,
null,
null,
null,
List.of(new SelectedFieldInfo(List.of(ID_FIELD_NAME))),
null,
null,
null);
final io.airbyte.api.client.model.generated.AirbyteCatalog clientCatalog =
new io.airbyte.api.client.model.generated.AirbyteCatalog(
List.of(
new io.airbyte.api.client.model.generated.AirbyteStreamAndConfiguration(
CLIENT_STREAM,
streamConfig)));

final ConfiguredAirbyteCatalog configuredCatalog = CatalogClientConverters.toConfiguredAirbyteInternal(clientCatalog);
final var stream = configuredCatalog.getStreams().getFirst();
assertEquals(STREAM_NAME, stream.getStream().getName());
assertEquals(1, stream.getFields().size());
assertEquals(fieldGenerator.getFieldsFromSchema(stream.getStream().getJsonSchema()), stream.getFields());
assertEquals(1, stream.getMappers().size());
assertEquals(MapperHelperKt.createHashingMapper(ID_FIELD_NAME), stream.getMappers().getFirst());
}

@Test
void testIsResumableImport() {
final List<Boolean> boolValues = new ArrayList<>(List.of(Boolean.TRUE, Boolean.FALSE));
Expand Down Expand Up @@ -158,7 +120,6 @@ void testIsResumableExport() {
null,
null,
null,
null,
null);
final var streamAndConf = new AirbyteStreamAndConfiguration(stream, conf);
final List<AirbyteStreamAndConfiguration> streams = List.of(streamAndConf);
Expand Down
1 change: 0 additions & 1 deletion airbyte-commons-server/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ dependencies {
implementation(project(":oss:airbyte-config:specs"))
implementation(project(":oss:airbyte-data"))
implementation(project(":oss:airbyte-featureflag"))
implementation(project(":oss:airbyte-mappers"))
implementation(project(":oss:airbyte-metrics:metrics-lib"))
implementation(project(":oss:airbyte-db:db-lib"))
implementation(project(":oss:airbyte-json-validation"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.ResetStreamsStateWhenDisabled;
import io.airbyte.featureflag.Workspace;
import io.airbyte.mappers.helpers.MapperHelperKt;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.metrics.lib.MetricAttribute;
import io.airbyte.metrics.lib.MetricClientFactory;
Expand Down Expand Up @@ -264,10 +263,7 @@ private void applyPatchToStandardSync(final StandardSync sync, final ConnectionU
validateCatalogDoesntContainDuplicateStreamNames(patch.getSyncCatalog());
validateCatalogSize(patch.getSyncCatalog(), workspaceId, "update");

final ConfiguredAirbyteCatalog configuredCatalog = CatalogConverter.toConfiguredInternal(patch.getSyncCatalog());
MapperHelperKt.validateConfiguredMappers(configuredCatalog);

sync.setCatalog(configuredCatalog);
sync.setCatalog(CatalogConverter.toConfiguredInternal(patch.getSyncCatalog()));
sync.withFieldSelectionData(CatalogConverter.getFieldSelectionData(patch.getSyncCatalog()));
}

Expand Down Expand Up @@ -556,9 +552,7 @@ public ConnectionRead createConnection(final ConnectionCreate connectionCreate)
validateCatalogDoesntContainDuplicateStreamNames(connectionCreate.getSyncCatalog());
validateCatalogSize(connectionCreate.getSyncCatalog(), workspaceId, "create");

final ConfiguredAirbyteCatalog configuredCatalog = CatalogConverter.toConfiguredInternal(connectionCreate.getSyncCatalog());
MapperHelperKt.validateConfiguredMappers(configuredCatalog);
standardSync.withCatalog(configuredCatalog);
standardSync.withCatalog(CatalogConverter.toConfiguredInternal(connectionCreate.getSyncCatalog()));
standardSync.withFieldSelectionData(CatalogConverter.getFieldSelectionData(connectionCreate.getSyncCatalog()));
} else {
standardSync.withCatalog(new ConfiguredAirbyteCatalog().withStreams(Collections.emptyList()));
Expand Down Expand Up @@ -693,7 +687,7 @@ private Builder<String, Object> generateMetadata(final StandardSync standardSync
return metadata;
}

public ConnectionRead updateConnection(final ConnectionUpdate connectionPatch, final String updateReason, final Boolean autoUpdate)
public ConnectionRead updateConnection(final ConnectionUpdate connectionPatch, String updateReason, Boolean autoUpdate)
throws ConfigNotFoundException, IOException, JsonValidationException {

final UUID connectionId = connectionPatch.getConnectionId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,10 @@
import io.airbyte.config.ActorCatalogFetchEvent;
import io.airbyte.config.ActorDefinitionVersion;
import io.airbyte.config.ConfiguredAirbyteCatalog;
import io.airbyte.config.Field;
import io.airbyte.config.JobStatusSummary;
import io.airbyte.config.RefreshStream.RefreshType;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSync;
import io.airbyte.config.helpers.FieldGenerator;
import io.airbyte.config.persistence.ActorDefinitionVersionHelper;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
Expand Down Expand Up @@ -112,7 +110,6 @@ public class WebBackendConnectionsHandler {
private final ConnectionService connectionService;
private final ActorDefinitionVersionHelper actorDefinitionVersionHelper;
private final FeatureFlagClient featureFlagClient;
private final FieldGenerator fieldGenerator;

public WebBackendConnectionsHandler(final ActorDefinitionVersionHandler actorDefinitionVersionHandler,
final ConnectionsHandler connectionsHandler,
Expand All @@ -126,7 +123,6 @@ public WebBackendConnectionsHandler(final ActorDefinitionVersionHandler actorDef
final ConfigRepository configRepositoryDoNotUse,
final ConnectionService connectionService,
final ActorDefinitionVersionHelper actorDefinitionVersionHelper,
final FieldGenerator fieldGenerator,
final FeatureFlagClient featureFlagClient) {
this.actorDefinitionVersionHandler = actorDefinitionVersionHandler;
this.connectionsHandler = connectionsHandler;
Expand All @@ -140,7 +136,6 @@ public WebBackendConnectionsHandler(final ActorDefinitionVersionHandler actorDef
this.configRepositoryDoNotUse = configRepositoryDoNotUse;
this.connectionService = connectionService;
this.actorDefinitionVersionHelper = actorDefinitionVersionHelper;
this.fieldGenerator = fieldGenerator;
this.featureFlagClient = featureFlagClient;
}

Expand Down Expand Up @@ -484,9 +479,9 @@ private Optional<SourceDiscoverSchemaRead> getRefreshedSchema(final UUID sourceI
* catalog
*/
@VisibleForTesting
protected AirbyteCatalog updateSchemaWithRefreshedDiscoveredCatalog(final AirbyteCatalog originalConfigured,
final AirbyteCatalog originalDiscovered,
final AirbyteCatalog discovered) {
protected static AirbyteCatalog updateSchemaWithRefreshedDiscoveredCatalog(final AirbyteCatalog originalConfigured,
final AirbyteCatalog originalDiscovered,
final AirbyteCatalog discovered) {
/*
* We can't directly use s.getStream() as the key, because it contains a bunch of other fields, so
* we just define a quick-and-dirty record class.
Expand Down Expand Up @@ -541,17 +536,6 @@ protected AirbyteCatalog updateSchemaWithRefreshedDiscoveredCatalog(final Airbyt
outputStreamConfig.setSuggested(originalConfiguredStream.getConfig().getSuggested());
outputStreamConfig.setFieldSelectionEnabled(originalStreamConfig.getFieldSelectionEnabled());

// Add hashed field configs that are still present in the schema
if (originalStreamConfig.getHashedFields() != null && !originalStreamConfig.getHashedFields().isEmpty()) {
final List<String> discoveredFields =
fieldGenerator.getFieldsFromSchema(stream.getJsonSchema()).stream().map(Field::getName).toList();
for (final SelectedFieldInfo hashedField : originalStreamConfig.getHashedFields()) {
if (discoveredFields.contains(hashedField.getFieldPath().getFirst())) {
outputStreamConfig.addHashedFieldsItem(hashedField);
}
}
}

if (outputStreamConfig.getFieldSelectionEnabled()) {
// TODO(mfsiega-airbyte): support nested fields.
// If field selection is enabled, populate the selected fields.
Expand Down
Loading

0 comments on commit 6cca28f

Please sign in to comment.