Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Destination S3: support anyOf, allOf, and oneOf #4613

Merged
merged 8 commits into from
Jul 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "4816b78f-1489-44c1-9060-4b19d5fa9362",
"name": "S3",
"dockerRepository": "airbyte/destination-s3",
"dockerImageTag": "0.1.7",
"dockerImageTag": "0.1.8",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/s3"
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
- destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
name: S3
dockerRepository: airbyte/destination-s3
dockerImageTag: 0.1.7
dockerImageTag: 0.1.8
documentationUrl: https://docs.airbyte.io/integrations/destinations/s3
- destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
name: Redshift
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.7
LABEL io.airbyte.version=0.1.8
LABEL io.airbyte.name=airbyte/destination-s3
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public enum JsonSchemaType {
BOOLEAN("boolean", true, Schema.Type.BOOLEAN),
NULL("null", true, Schema.Type.NULL),
OBJECT("object", false, Schema.Type.RECORD),
ARRAY("array", false, Schema.Type.ARRAY);
ARRAY("array", false, Schema.Type.ARRAY),
COMBINED("combined", false, Schema.Type.UNION);

private final String jsonSchemaType;
private final boolean isPrimitive;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package io.airbyte.integrations.destination.s3.avro;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.base.Preconditions;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.integrations.base.JavaBaseConstants;
Expand All @@ -34,6 +35,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -64,23 +66,46 @@ public class JsonToAvroSchemaConverter {

private final Map<String, String> standardizedNames = new HashMap<>();

static List<JsonSchemaType> getNonNullTypes(String fieldName, JsonNode typeProperty) {
return getTypes(fieldName, typeProperty).stream()
static List<JsonSchemaType> getNonNullTypes(String fieldName, JsonNode fieldDefinition) {
return getTypes(fieldName, fieldDefinition).stream()
.filter(type -> type != JsonSchemaType.NULL).collect(Collectors.toList());
}

static List<JsonSchemaType> getTypes(String fieldName, JsonNode typeProperty) {
if (typeProperty == null) {
static List<JsonSchemaType> getTypes(String fieldName, JsonNode fieldDefinition) {
Optional<JsonNode> combinedRestriction = getCombinedRestriction(fieldDefinition);
if (combinedRestriction.isPresent()) {
return Collections.singletonList(JsonSchemaType.COMBINED);
}

JsonNode typeProperty = fieldDefinition.get("type");
if (typeProperty == null || typeProperty.isNull()) {
throw new IllegalStateException(String.format("Field %s has no type", fieldName));
} else if (typeProperty.isArray()) {
}

if (typeProperty.isArray()) {
return MoreIterators.toList(typeProperty.elements()).stream()
.map(s -> JsonSchemaType.fromJsonSchemaType(s.asText()))
.collect(Collectors.toList());
} else if (typeProperty.isTextual()) {
}

if (typeProperty.isTextual()) {
return Collections.singletonList(JsonSchemaType.fromJsonSchemaType(typeProperty.asText()));
} else {
throw new IllegalStateException("Unexpected type: " + typeProperty);
}

throw new IllegalStateException("Unexpected type: " + typeProperty);
}

static Optional<JsonNode> getCombinedRestriction(JsonNode fieldDefinition) {
if (fieldDefinition.has("anyOf")) {
return Optional.of(fieldDefinition.get("anyOf"));
}
if (fieldDefinition.has("allOf")) {
return Optional.of(fieldDefinition.get("allOf"));
}
if (fieldDefinition.has("oneOf")) {
return Optional.of(fieldDefinition.get("oneOf"));
}
return Optional.empty();
}

public Map<String, String> getStandardizedNames() {
Expand Down Expand Up @@ -141,33 +166,27 @@ public Schema getAvroSchema(JsonNode jsonSchema,
return assembler.endRecord();
}

Schema getSingleFieldType(String fieldName,
JsonSchemaType fieldType,
JsonNode fieldDefinition,
boolean canBeComposite) {
Schema getSingleFieldType(String fieldName, JsonSchemaType fieldType, JsonNode fieldDefinition) {
Preconditions
.checkState(fieldType != JsonSchemaType.NULL, "Null types should have been filtered out");
Preconditions
.checkState(canBeComposite || fieldType.isPrimitive(), "Field %s has invalid type %s",
fieldName, fieldType);

Schema fieldSchema;
switch (fieldType) {
case STRING, NUMBER, INTEGER, BOOLEAN -> fieldSchema = Schema.create(fieldType.getAvroType());
case COMBINED -> {
Optional<JsonNode> combinedRestriction = getCombinedRestriction(fieldDefinition);
List<Schema> unionTypes = getSchemasFromTypes(fieldName, (ArrayNode) combinedRestriction.get());
fieldSchema = Schema.createUnion(unionTypes);
}
case ARRAY -> {
JsonNode items = fieldDefinition.get("items");
Preconditions.checkNotNull(items, "Array field %s misses the items property.", fieldName);

if (items.isObject()) {
fieldSchema = Schema
.createArray(getNullableFieldTypes(String.format("%s.items", fieldName), items));
fieldSchema = Schema.createArray(getNullableFieldTypes(String.format("%s.items", fieldName), items));
} else if (items.isArray()) {
List<Schema> arrayElementTypes = MoreIterators.toList(items.elements())
.stream()
.flatMap(itemDefinition -> getNonNullTypes(fieldName, itemDefinition.get("type")).stream()
.map(type -> getSingleFieldType(fieldName, type, itemDefinition, false)))
.distinct()
.collect(Collectors.toList());
arrayElementTypes.add(0, Schema.create(Schema.Type.NULL));
List<Schema> arrayElementTypes = getSchemasFromTypes(fieldName, (ArrayNode) items);
arrayElementTypes.add(0, Schema.create(Type.NULL));
fieldSchema = Schema.createArray(Schema.createUnion(arrayElementTypes));
} else {
throw new IllegalStateException(
Expand All @@ -181,15 +200,30 @@ Schema getSingleFieldType(String fieldName,
return fieldSchema;
}

List<Schema> getSchemasFromTypes(String fieldName, ArrayNode types) {
return MoreIterators.toList(types.elements())
.stream()
.flatMap(definition -> getNonNullTypes(fieldName, definition).stream().flatMap(type -> {
Schema singleFieldSchema = getSingleFieldType(fieldName, type, definition);
if (singleFieldSchema.isUnion()) {
return singleFieldSchema.getTypes().stream();
} else {
return Stream.of(singleFieldSchema);
}
}))
.distinct()
.collect(Collectors.toList());
}

/**
* @param fieldDefinition - Json schema field definition. E.g. { type: "number" }.
*/
Schema getNullableFieldTypes(String fieldName, JsonNode fieldDefinition) {
// Filter out null types, which will be added back in the end.
List<Schema> nonNullFieldTypes = getNonNullTypes(fieldName, fieldDefinition.get("type"))
List<Schema> nonNullFieldTypes = getNonNullTypes(fieldName, fieldDefinition)
.stream()
.flatMap(fieldType -> {
Schema singleFieldSchema = getSingleFieldType(fieldName, fieldType, fieldDefinition, true);
Schema singleFieldSchema = getSingleFieldType(fieldName, fieldType, fieldDefinition);
if (singleFieldSchema.isUnion()) {
return singleFieldSchema.getTypes().stream();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package io.airbyte.integrations.destination.s3.avro;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
Expand All @@ -47,15 +48,27 @@ public void testGetSingleTypes() {
JsonNode input1 = Jsons.deserialize("{ \"type\": \"number\" }");
assertEquals(
Collections.singletonList(JsonSchemaType.NUMBER),
JsonToAvroSchemaConverter.getTypes("field", input1.get("type")));
JsonToAvroSchemaConverter.getTypes("field", input1));
}

@Test
public void testGetUnionTypes() {
JsonNode input2 = Jsons.deserialize("{ \"type\": [\"null\", \"string\"] }");
assertEquals(
Lists.newArrayList(JsonSchemaType.NULL, JsonSchemaType.STRING),
JsonToAvroSchemaConverter.getTypes("field", input2.get("type")));
JsonToAvroSchemaConverter.getTypes("field", input2));
}

@Test
public void testNoCombinedRestriction() {
JsonNode input1 = Jsons.deserialize("{ \"type\": \"number\" }");
assertTrue(JsonToAvroSchemaConverter.getCombinedRestriction(input1).isEmpty());
}

@Test
public void testWithCombinedRestriction() {
JsonNode input2 = Jsons.deserialize("{ \"anyOf\": [{ \"type\": \"string\" }, { \"type\": \"integer\" }] }");
assertTrue(JsonToAvroSchemaConverter.getCombinedRestriction(input2).isPresent());
}

public static class GetFieldTypeTestCaseProvider implements ArgumentsProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,5 +253,132 @@
}
]
}
},
{
"schemaName": "field_with_combined_restriction",
"namespace": "namespace8",
"appendAirbyteFields": false,
"jsonSchema": {
"properties": {
"created_at": {
tuliren marked this conversation as resolved.
Show resolved Hide resolved
"anyOf": [
{
"type": "string",
"format": "date-time"
},
{
"type": ["null", "string"]
},
{
"type": "integer"
}
]
}
}
},
"avroSchema": {
"type": "record",
"name": "field_with_combined_restriction",
"namespace": "namespace8",
"fields": [
{
"name": "created_at",
"type": ["null", "string", "int"],
"default": null
}
]
}
},
{
"schemaName": "record_with_combined_restriction_field",
"namespace": "namespace9",
"appendAirbyteFields": false,
"jsonSchema": {
"properties": {
"user": {
"type": "object",
"properties": {
"created_at": {
"anyOf": [
{
"type": "string",
"format": "date-time"
},
{
"type": ["null", "string"]
},
{
"type": "integer"
}
]
}
}
}
}
},
"avroSchema": {
"type": "record",
"name": "record_with_combined_restriction_field",
"namespace": "namespace9",
"fields": [
{
"name": "user",
"type": [
"null",
{
"type": "record",
"name": "user",
"namespace": "",
"fields": [
{
"name": "created_at",
"type": ["null", "string", "int"],
"default": null
}
]
}
],
"default": null
}
]
}
},
{
"schemaName": "array_with_combined_restriction_field",
"namespace": "namespace10",
"appendAirbyteFields": false,
"jsonSchema": {
"properties": {
"identifiers": {
"type": "array",
"items": [
{
"oneOf": [{ "type": "integer" }, { "type": "string" }]
},
{
"type": "boolean"
}
]
}
}
},
"avroSchema": {
"type": "record",
"name": "array_with_combined_restriction_field",
"namespace": "namespace10",
"fields": [
{
"name": "identifiers",
"type": [
"null",
{
"type": "array",
"items": ["null", "int", "string", "boolean"]
}
],
"default": null
}
]
}
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -103,5 +103,26 @@
]
}
]
},
{
"fieldName": "any_of_field",
"jsonFieldSchema": {
"anyOf": [{ "type": "string" }, { "type": "integer" }]
},
"avroFieldType": ["null", "string", "int"]
},
{
"fieldName": "all_of_field",
"jsonFieldSchema": {
"allOf": [{ "type": "string" }, { "type": "integer" }]
},
"avroFieldType": ["null", "string", "int"]
},
{
"fieldName": "one_of_field",
"jsonFieldSchema": {
"oneOf": [{ "type": "string" }, { "type": "integer" }]
},
"avroFieldType": ["null", "string", "int"]
}
]
Loading