Skip to content

Commit

Permalink
Merge branch 'master' into edgao/revert_normalization_protocol_v1
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao authored Feb 2, 2023
2 parents 866358b + aa5ed6d commit e54ccaf
Show file tree
Hide file tree
Showing 21 changed files with 183 additions and 79 deletions.
2 changes: 1 addition & 1 deletion airbyte-bootloader/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ airbyte:
target:
range:
min-version: ${AIRBYTE_PROTOCOL_VERSION_MIN:0.0.0}
max-version: ${AIRBYTE_PROTOCOL_VERSION_MAX:1.0.0}
max-version: ${AIRBYTE_PROTOCOL_VERSION_MAX:0.3.0}
secret:
persistence: ${SECRET_PERSISTENCE:TESTING_CONFIG_DB_TABLE}
store:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ public class MigrationContainer<T extends Migration> {

private final List<T> migrationsToRegister;
private final SortedMap<String, T> migrations = new TreeMap<>();
private String mostRecentMajorVersion = "";

// mostRecentMajorVersion defaults to v0 as no migration is required
private String mostRecentMajorVersion = "0";

public MigrationContainer(final List<T> migrations) {
this.migrationsToRegister = migrations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.JsonSchemaReferenceTypes;
import io.airbyte.validation.json.JsonSchemaValidator;
import jakarta.inject.Singleton;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;

@Singleton
// Disable V1 Migration, uncomment to re-enable
// @Singleton
public class AirbyteMessageMigrationV1 implements AirbyteMessageMigration<io.airbyte.protocol.models.v0.AirbyteMessage, AirbyteMessage> {

private final JsonSchemaValidator validator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,102 @@ private static boolean hasV0DataType(final JsonNode schema) {
return false;
}

/**
* Performs an in-place migration of the schema from v1 to v0 if v1 data types are detected
*
* @param configuredAirbyteCatalog to migrate
*/
public static void downgradeSchemaIfNeeded(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) {
if (containsV1DataTypes(configuredAirbyteCatalog)) {
downgradeSchema(configuredAirbyteCatalog);
}
}

/**
* Performs an in-place migration of the schema from v1 to v0 if v1 data types are detected
*
* @param airbyteCatalog to migrate
*/
public static void downgradeSchemaIfNeeded(final AirbyteCatalog airbyteCatalog) {
if (containsV1DataTypes(airbyteCatalog)) {
downgradeSchema(airbyteCatalog);
}
}

/**
* Performs an in-place migration of the schema from v1 to v0
*
* @param configuredAirbyteCatalog to migrate
*/
private static void downgradeSchema(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) {
for (final var stream : configuredAirbyteCatalog.getStreams()) {
SchemaMigrationV1.downgradeSchema(stream.getStream().getJsonSchema());
}
}

/**
* Performs an in-place migration of the schema from v1 to v0
*
* @param airbyteCatalog to migrate
*/
private static void downgradeSchema(final AirbyteCatalog airbyteCatalog) {
for (final var stream : airbyteCatalog.getStreams()) {
SchemaMigrationV1.downgradeSchema(stream.getJsonSchema());
}
}

/**
* Returns true if catalog contains v1 data types
*/
private static boolean containsV1DataTypes(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) {
if (configuredAirbyteCatalog == null) {
return false;
}

return configuredAirbyteCatalog
.getStreams()
.stream().findFirst()
.map(ConfiguredAirbyteStream::getStream)
.map(CatalogMigrationV1Helper::streamContainsV1DataTypes)
.orElse(false);
}

/**
* Returns true if catalog contains v1 data types
*/
private static boolean containsV1DataTypes(final AirbyteCatalog airbyteCatalog) {
if (airbyteCatalog == null) {
return false;
}

return airbyteCatalog
.getStreams()
.stream().findFirst()
.map(CatalogMigrationV1Helper::streamContainsV1DataTypes)
.orElse(false);
}

private static boolean streamContainsV1DataTypes(final AirbyteStream airbyteStream) {
if (airbyteStream == null || airbyteStream.getJsonSchema() == null) {
return false;
}
return hasV1DataType(airbyteStream.getJsonSchema());
}

/**
* Performs of search of a v0 data type node, returns true at the first node found.
*/
private static boolean hasV1DataType(final JsonNode schema) {
if (SchemaMigrationV1.isPrimitiveReferenceTypeDeclaration(schema)) {
return true;
}

for (final JsonNode subSchema : SchemaMigrations.findSubschemas(schema)) {
if (hasV1DataType(subSchema)) {
return true;
}
}
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
import io.airbyte.commons.version.Version;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import jakarta.inject.Singleton;

@Singleton
// Disable V1 Migration, uncomment to re-enable
// @Singleton
public class ConfiguredAirbyteCatalogMigrationV1
implements ConfiguredAirbyteCatalogMigration<io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog, ConfiguredAirbyteCatalog> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ static boolean isPrimitiveTypeDeclaration(final JsonNode schema) {
* Detects any schema that looks like a reference type declaration, e.g.: { "$ref":
* "WellKnownTypes.json...." } or { "oneOf": [{"$ref": "..."}, {"type": "object"}] }
*/
private static boolean isPrimitiveReferenceTypeDeclaration(final JsonNode schema) {
static boolean isPrimitiveReferenceTypeDeclaration(final JsonNode schema) {
if (!schema.isObject()) {
// Non-object schemas (i.e. true/false) never need to be modified
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package io.airbyte.commons.protocol.serde;

import io.airbyte.commons.version.AirbyteProtocolVersion;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage;
import jakarta.inject.Singleton;

@Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package io.airbyte.commons.protocol.serde;

import io.airbyte.commons.version.AirbyteProtocolVersion;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage;
import jakarta.inject.Singleton;

@Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class MigratorsMicronautTest {

// This should contain the list of all the supported majors of the airbyte protocol except the most
// recent one since the migrations themselves are keyed on the lower version.
final Set<String> SUPPORTED_VERSIONS = Set.of("0");
final Set<String> SUPPORTED_VERSIONS = Set.of();

@Test
void testAirbyteMessageMigrationInjection() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
import static org.junit.jupiter.api.Assertions.assertEquals;

import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.ConnectorSpecification;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.ConnectorSpecification;
import java.net.URI;
import java.net.URISyntaxException;
import org.junit.jupiter.api.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import io.airbyte.commons.protocol.AirbyteMessageSerDeProvider;
import io.airbyte.commons.protocol.AirbyteProtocolVersionedMigratorFactory;
import io.airbyte.commons.protocol.ConfiguredAirbyteCatalogMigrator;
import io.airbyte.commons.protocol.migrations.v1.AirbyteMessageMigrationV1;
import io.airbyte.commons.protocol.migrations.v1.ConfiguredAirbyteCatalogMigrationV1;
import io.airbyte.commons.protocol.serde.AirbyteMessageV0Deserializer;
import io.airbyte.commons.protocol.serde.AirbyteMessageV0Serializer;
import io.airbyte.commons.protocol.serde.AirbyteMessageV1Deserializer;
Expand Down Expand Up @@ -45,10 +43,12 @@ void beforeEach() {
List.of(new AirbyteMessageV0Serializer(), new AirbyteMessageV1Serializer())));
serDeProvider.initialize();
final AirbyteMessageMigrator airbyteMessageMigrator = new AirbyteMessageMigrator(
List.of(new AirbyteMessageMigrationV1()));
// TODO once data types v1 is re-enabled, this test should contain the migration
List.of(/* new AirbyteMessageMigrationV1() */));
airbyteMessageMigrator.initialize();
final ConfiguredAirbyteCatalogMigrator configuredAirbyteCatalogMigrator = new ConfiguredAirbyteCatalogMigrator(
List.of(new ConfiguredAirbyteCatalogMigrationV1()));
// TODO once data types v1 is re-enabled, this test should contain the migration
List.of(/* new ConfiguredAirbyteCatalogMigrationV1() */));
configuredAirbyteCatalogMigrator.initialize();
migratorFactory = spy(new AirbyteProtocolVersionedMigratorFactory(airbyteMessageMigrator, configuredAirbyteCatalogMigrator));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public class EnvConfigs implements Configs {
private static final long DEFAULT_MAX_SYNC_WORKERS = 5;
private static final long DEFAULT_MAX_NOTIFY_WORKERS = 5;
private static final String DEFAULT_NETWORK = "host";
private static final Version DEFAULT_AIRBYTE_PROTOCOL_VERSION_MAX = new Version("1.0.0");
private static final Version DEFAULT_AIRBYTE_PROTOCOL_VERSION_MAX = new Version("0.3.0");
private static final Version DEFAULT_AIRBYTE_PROTOCOL_VERSION_MIN = new Version("0.0.0");
private static final String AUTO_DETECT_SCHEMA = "AUTO_DETECT_SCHEMA";
private static final String APPLY_FIELD_SELECTION = "APPLY_FIELD_SELECTION";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ public static StandardSync buildStandardSync(final Record record, final List<UUI
private static ConfiguredAirbyteCatalog parseConfiguredAirbyteCatalog(final String configuredAirbyteCatalogString) {
final ConfiguredAirbyteCatalog configuredAirbyteCatalog = Jsons.deserialize(configuredAirbyteCatalogString, ConfiguredAirbyteCatalog.class);
// On-the-fly migration of persisted data types related objects (protocol v0->v1)
CatalogMigrationV1Helper.upgradeSchemaIfNeeded(configuredAirbyteCatalog);
// TODO feature flag this for data types rollout
// CatalogMigrationV1Helper.upgradeSchemaIfNeeded(configuredAirbyteCatalog);
CatalogMigrationV1Helper.downgradeSchemaIfNeeded(configuredAirbyteCatalog);
return configuredAirbyteCatalog;
}

Expand Down Expand Up @@ -249,7 +251,9 @@ public static ActorCatalogWithUpdatedAt buildActorCatalogWithUpdatedAt(final Rec
public static AirbyteCatalog parseAirbyteCatalog(final String airbyteCatalogString) {
final AirbyteCatalog airbyteCatalog = Jsons.deserialize(airbyteCatalogString, AirbyteCatalog.class);
// On-the-fly migration of persisted data types related objects (protocol v0->v1)
CatalogMigrationV1Helper.upgradeSchemaIfNeeded(airbyteCatalog);
// TODO feature flag this for data types rollout
// CatalogMigrationV1Helper.upgradeSchemaIfNeeded(airbyteCatalog);
CatalogMigrationV1Helper.downgradeSchemaIfNeeded(airbyteCatalog);
return airbyteCatalog;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ void testSimpleInsertActorCatalog() throws IOException, JsonValidationException,
configRepository.writeSourceConnectionNoSecrets(source);

final AirbyteCatalog actorCatalog = CatalogHelpers.createAirbyteCatalog("clothes", Field.of("name", JsonSchemaType.STRING));
final AirbyteCatalog expectedActorCatalog = CatalogHelpers.createAirbyteCatalog("clothes", Field.of("name", JsonSchemaType.STRING_V1));
final AirbyteCatalog expectedActorCatalog = CatalogHelpers.createAirbyteCatalog("clothes", Field.of("name", JsonSchemaType.STRING));
configRepository.writeActorCatalogFetchEvent(
actorCatalog, source.getSourceId(), DOCKER_IMAGE_TAG, CONFIG_HASH);

Expand Down Expand Up @@ -201,7 +201,8 @@ void testSimpleInsertActorCatalog() throws IOException, JsonValidationException,
assertEquals(expectedActorCatalog, Jsons.object(catalogNewConfig.get().getCatalog(), AirbyteCatalog.class));

final int catalogDbEntry2 = database.query(ctx -> ctx.selectCount().from(ACTOR_CATALOG)).fetchOne().into(int.class);
assertEquals(2, catalogDbEntry2);
// TODO this should be 2 once we re-enable datatypes v1
assertEquals(1, catalogDbEntry2);
}

@Test
Expand Down Expand Up @@ -484,13 +485,16 @@ void testGetStandardSyncUsingOperation() throws IOException {
}

private List<StandardSync> copyWithV1Types(final List<StandardSync> syncs) {
return syncs.stream()
.map(standardSync -> {
final StandardSync copiedStandardSync = Jsons.deserialize(Jsons.serialize(standardSync), StandardSync.class);
copiedStandardSync.setCatalog(MockData.getConfiguredCatalogWithV1DataTypes());
return copiedStandardSync;
})
.toList();
return syncs;
// TODO adjust with data types feature flag testing
// return syncs.stream()
// .map(standardSync -> {
// final StandardSync copiedStandardSync = Jsons.deserialize(Jsons.serialize(standardSync),
// StandardSync.class);
// copiedStandardSync.setCatalog(MockData.getConfiguredCatalogWithV1DataTypes());
// return copiedStandardSync;
// })
// .toList();
}

private void assertSyncsMatch(final List<StandardSync> expectedSyncs, final List<StandardSync> actualSyncs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -931,9 +931,13 @@ private static JobConfig parseJobConfigFromString(final String jobConfigString)
final JobConfig jobConfig = Jsons.deserialize(jobConfigString, JobConfig.class);
// On-the-fly migration of persisted data types related objects (protocol v0->v1)
if (jobConfig.getConfigType() == ConfigType.SYNC && jobConfig.getSync() != null) {
CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobConfig.getSync().getConfiguredAirbyteCatalog());
// TODO feature flag this for data types rollout
// CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobConfig.getSync().getConfiguredAirbyteCatalog());
CatalogMigrationV1Helper.downgradeSchemaIfNeeded(jobConfig.getSync().getConfiguredAirbyteCatalog());
} else if (jobConfig.getConfigType() == ConfigType.RESET_CONNECTION && jobConfig.getResetConnection() != null) {
CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobConfig.getResetConnection().getConfiguredAirbyteCatalog());
// TODO feature flag this for data types rollout
// CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobConfig.getResetConnection().getConfiguredAirbyteCatalog());
CatalogMigrationV1Helper.downgradeSchemaIfNeeded(jobConfig.getResetConnection().getConfiguredAirbyteCatalog());
}
return jobConfig;
}
Expand All @@ -960,9 +964,13 @@ private static JobOutput parseJobOutputFromString(final String jobOutputString)
final JobOutput jobOutput = Jsons.deserialize(jobOutputString, JobOutput.class);
// On-the-fly migration of persisted data types related objects (protocol v0->v1)
if (jobOutput.getOutputType() == OutputType.DISCOVER_CATALOG && jobOutput.getDiscoverCatalog() != null) {
CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobOutput.getDiscoverCatalog().getCatalog());
// TODO feature flag this for data types rollout
// CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobOutput.getDiscoverCatalog().getCatalog());
CatalogMigrationV1Helper.downgradeSchemaIfNeeded(jobOutput.getDiscoverCatalog().getCatalog());
} else if (jobOutput.getOutputType() == OutputType.SYNC && jobOutput.getSync() != null) {
CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobOutput.getSync().getOutputCatalog());
// TODO feature flag this for data types rollout
// CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobOutput.getSync().getOutputCatalog());
CatalogMigrationV1Helper.downgradeSchemaIfNeeded(jobOutput.getSync().getOutputCatalog());
}
return jobOutput;
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-server/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ airbyte:
root: ${WORKSPACE_ROOT}
protocol:
min-version: ${AIRBYTE_PROTOCOL_VERSION_MIN:0.0.0}
max-version: ${AIRBYTE_PROTOCOL_VERSION_MAX:1.0.0}
max-version: ${AIRBYTE_PROTOCOL_VERSION_MAX:0.3.0}

temporal:
cloud:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,19 +585,6 @@ public List<JsonNode> retrieveSourceRecords(final Database database, final Strin
return database.query(context -> context.fetch(String.format("SELECT * FROM %s;", table)))
.stream()
.map(Record::intoMap)
.map(rec -> {
// The protocol requires converting numbers to strings. source-postgres does that internally,
// but we're querying the DB directly, so we have to do it manually.
final Map<String, Object> stringifiedNumbers = new HashMap<>();
for (final String key : rec.keySet()) {
Object o = rec.get(key);
if (o instanceof Number) {
o = o.toString();
}
stringifiedNumbers.put(key, o);
}
return stringifiedNumbers;
})
.map(Jsons::jsonNode)
.collect(Collectors.toList());
}
Expand Down
Loading

0 comments on commit e54ccaf

Please sign in to comment.