Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bmoric/fix diff #16513

Merged
merged 14 commits into from
Sep 13, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.airbyte.protocol.models.transform_models.UpdateStreamTransform;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -334,10 +335,16 @@ private static UpdateStreamTransform getStreamDiff(final AirbyteStream streamOld
final Set<FieldTransform> fieldTransforms = new HashSet<>();
final Map<List<String>, JsonNode> fieldNameToTypeOld = getFullyQualifiedFieldNamesWithTypes(streamOld.getJsonSchema())
.stream()
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
.collect(
HashMap::new,
CatalogHelpers::collectInHashMap,
CatalogHelpers::combineAccumulator);
final Map<List<String>, JsonNode> fieldNameToTypeNew = getFullyQualifiedFieldNamesWithTypes(streamNew.getJsonSchema())
.stream()
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
.collect(
HashMap::new,
CatalogHelpers::collectInHashMap,
CatalogHelpers::combineAccumulator);

Sets.difference(fieldNameToTypeOld.keySet(), fieldNameToTypeNew.keySet())
.forEach(fieldName -> fieldTransforms.add(FieldTransform.createRemoveFieldTransform(fieldName, fieldNameToTypeOld.get(fieldName))));
Expand All @@ -354,4 +361,27 @@ private static UpdateStreamTransform getStreamDiff(final AirbyteStream streamOld
return new UpdateStreamTransform(fieldTransforms);
}

@VisibleForTesting
static final JsonNode DUPLICATED_SCHEMA = Jsons.jsonNode("Duplicated Schema");

@VisibleForTesting
static void collectInHashMap(final Map<List<String>, JsonNode> accumulator, final Pair<List<String>, JsonNode> value) {
if (accumulator.containsKey(value.getKey())) {
accumulator.put(value.getKey(), DUPLICATED_SCHEMA);
} else {
accumulator.put(value.getKey(), value.getValue());
}
}

@VisibleForTesting
static void combineAccumulator(final Map<List<String>, JsonNode> accumulatorLeft, final Map<List<String>, JsonNode> accumulatorRight) {
accumulatorRight.forEach((key, value) -> {
if (accumulatorLeft.containsKey(key)) {
accumulatorLeft.put(key, DUPLICATED_SCHEMA);
} else {
accumulatorLeft.put(key, value);
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,22 @@
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.protocol.models.transform_models.FieldTransform;
import io.airbyte.protocol.models.transform_models.StreamTransform;
import io.airbyte.protocol.models.transform_models.StreamTransformType;
import io.airbyte.protocol.models.transform_models.UpdateFieldSchemaTransform;
import io.airbyte.protocol.models.transform_models.UpdateStreamTransform;
import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.stream.Stream;
import lombok.val;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Condition;
import org.elasticsearch.common.collect.Map;
import org.junit.jupiter.api.Test;

@SuppressWarnings("PMD.JUnitTestsShouldIncludeAssert")
class CatalogHelpersTest {

// handy for debugging test only.
Expand All @@ -30,6 +37,9 @@ class CatalogHelpersTest {
private static final String ITEMS = "items";
private static final String SOME_ARRAY = "someArray";
private static final String PROPERTIES = "properties";
private static final String USERS = "users";
private static final String COMPANIES_VALID = "companies_schema.json";
private static final String COMPANIES_INVALID = "companies_schema_invalid.json";

@Test
void testFieldToJsonSchema() {
Expand Down Expand Up @@ -98,17 +108,17 @@ void testGetCatalogDiff() throws IOException {
final JsonNode schema1 = Jsons.deserialize(MoreResources.readResource("valid_schema.json"));
final JsonNode schema2 = Jsons.deserialize(MoreResources.readResource("valid_schema2.json"));
final AirbyteCatalog catalog1 = new AirbyteCatalog().withStreams(List.of(
new AirbyteStream().withName("users").withJsonSchema(schema1),
new AirbyteStream().withName(USERS).withJsonSchema(schema1),
new AirbyteStream().withName("accounts").withJsonSchema(Jsons.emptyObject())));
final AirbyteCatalog catalog2 = new AirbyteCatalog().withStreams(List.of(
new AirbyteStream().withName("users").withJsonSchema(schema2),
new AirbyteStream().withName(USERS).withJsonSchema(schema2),
new AirbyteStream().withName("sales").withJsonSchema(Jsons.emptyObject())));

final Set<StreamTransform> actualDiff = CatalogHelpers.getCatalogDiff(catalog1, catalog2);
final List<StreamTransform> expectedDiff = Stream.of(
StreamTransform.createAddStreamTransform(new StreamDescriptor().withName("sales")),
StreamTransform.createRemoveStreamTransform(new StreamDescriptor().withName("accounts")),
StreamTransform.createUpdateStreamTransform(new StreamDescriptor().withName("users"), new UpdateStreamTransform(Set.of(
StreamTransform.createUpdateStreamTransform(new StreamDescriptor().withName(USERS), new UpdateStreamTransform(Set.of(
FieldTransform.createAddFieldTransform(List.of("COD"), schema2.get(PROPERTIES).get("COD")),
FieldTransform.createRemoveFieldTransform(List.of("something2"), schema1.get(PROPERTIES).get("something2")),
FieldTransform.createRemoveFieldTransform(List.of("HKD"), schema1.get(PROPERTIES).get("HKD")),
Expand All @@ -127,7 +137,8 @@ void testGetCatalogDiff() throws IOException {
schema2.get(PROPERTIES).get(SOME_ARRAY).get(ITEMS).get(PROPERTIES).get("newName"))))))
.sorted(STREAM_TRANSFORM_COMPARATOR)
.toList();
assertEquals(expectedDiff, actualDiff.stream().sorted(STREAM_TRANSFORM_COMPARATOR).toList());

Assertions.assertThat(actualDiff).containsAll(expectedDiff);
}

@Test
Expand All @@ -151,4 +162,59 @@ void testExtractIncrementalStreamDescriptors() {
assertEquals("one", streamDescriptors.get(0).getName());
}

@Test
void testGetFullyQualifiedFieldNamesWithTypes() throws IOException {
CatalogHelpers.getFullyQualifiedFieldNamesWithTypes(
Jsons.deserialize(MoreResources.readResource(COMPANIES_VALID))).stream().collect(
() -> new HashMap<>(),
CatalogHelpers::collectInHashMap,
CatalogHelpers::combineAccumulator);
}

@Test
void testGetFullyQualifiedFieldNamesWithTypesOnInvalidSchema() throws IOException {
val resultField = CatalogHelpers.getFullyQualifiedFieldNamesWithTypes(
Jsons.deserialize(MoreResources.readResource(COMPANIES_INVALID))).stream().collect(
() -> new HashMap<>(),
CatalogHelpers::collectInHashMap,
CatalogHelpers::combineAccumulator);

Assertions.assertThat(resultField)
.contains(
Map.entry(
List.of("tags", "tags", "items"),
CatalogHelpers.DUPLICATED_SCHEMA));
}

@Test
void testGetCatalogDiffWithInvalidSchema() throws IOException {
final JsonNode schema1 = Jsons.deserialize(MoreResources.readResource(COMPANIES_INVALID));
final JsonNode schema2 = Jsons.deserialize(MoreResources.readResource(COMPANIES_VALID));
final AirbyteCatalog catalog1 = new AirbyteCatalog().withStreams(List.of(
new AirbyteStream().withName(USERS).withJsonSchema(schema1)));
final AirbyteCatalog catalog2 = new AirbyteCatalog().withStreams(List.of(
new AirbyteStream().withName(USERS).withJsonSchema(schema2)));

final Set<StreamTransform> actualDiff = CatalogHelpers.getCatalogDiff(catalog1, catalog2);

Assertions.assertThat(actualDiff).hasSize(1);
Assertions.assertThat(actualDiff).first()
.has(new Condition<StreamTransform>(streamTransform -> streamTransform.getTransformType() == StreamTransformType.UPDATE_STREAM,
"Check update"));
}

@Test
void testGetCatalogDiffWithBothInvalidSchema() throws IOException {
final JsonNode schema1 = Jsons.deserialize(MoreResources.readResource(COMPANIES_INVALID));
final JsonNode schema2 = Jsons.deserialize(MoreResources.readResource(COMPANIES_INVALID));
final AirbyteCatalog catalog1 = new AirbyteCatalog().withStreams(List.of(
new AirbyteStream().withName(USERS).withJsonSchema(schema1)));
final AirbyteCatalog catalog2 = new AirbyteCatalog().withStreams(List.of(
new AirbyteStream().withName(USERS).withJsonSchema(schema2)));

final Set<StreamTransform> actualDiff = CatalogHelpers.getCatalogDiff(catalog1, catalog2);

Assertions.assertThat(actualDiff).hasSize(0);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"type": "object",
"properties": {
"tags": {
"type": "object",
"properties": {
"tags": {
"type": "array",
"items": {
"anyOf": [
{
"type": "object",
"properties": {
"id": {
"type": "integer"
},
"name": {
"type": "string"
},
"type": {
"type": "string"
}
}
},
{ "type": "null" }
]
}
}
}
}
},
"additionalProperties": false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"type": "object",
"properties": {
"tags": {
"type": "object",
"properties": {
"tags": {
"type": "array",
"items": {
"type": ["null", "object"],
"anyOf": [
{
"type": "object",
"properties": {
"id": {
"type": "integer"
},
"name": {
"type": "string"
},
"type": {
"type": "string"
}
}
},
{ "type": "null" }
]
}
}
}
}
},
"additionalProperties": false
}