Skip to content

Commit

Permalink
🐞 Destination S3 & GCS Avro: support array with unknown item type (#9367
Browse files Browse the repository at this point in the history
)

* Support array field with empty items specification

* Remove all exceptions

* Format code

* Bump connector versions

* Bump bigquery versions

* Update docs

* Remove unused code

* Update doc for PR #9363

* Update doc about defaulting all improperly typed fields to string

* Ignore bigquery

* Update version and doc

* Update doc

* Bump version in seed
  • Loading branch information
tuliren authored Jan 12, 2022
1 parent 826f55d commit 5f6785d
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
- name: Google Cloud Storage (GCS)
destinationDefinitionId: ca8f6566-e555-4b40-943a-545bf123117a
dockerRepository: airbyte/destination-gcs
dockerImageTag: 0.1.19
dockerImageTag: 0.1.20
documentationUrl: https://docs.airbyte.io/integrations/destinations/gcs
icon: googlecloudstorage.svg
- name: Google PubSub
Expand Down Expand Up @@ -167,7 +167,7 @@
- name: S3
destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerRepository: airbyte/destination-s3
dockerImageTag: 0.2.2
dockerImageTag: 0.2.3
documentationUrl: https://docs.airbyte.io/integrations/destinations/s3
icon: s3.svg
- name: SFTP-JSON
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1143,7 +1143,7 @@
- "overwrite"
- "append"
supportsNamespaces: true
- dockerImage: "airbyte/destination-gcs:0.1.19"
- dockerImage: "airbyte/destination-gcs:0.1.20"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/gcs"
connectionSpecification:
Expand Down Expand Up @@ -3396,7 +3396,7 @@
supported_destination_sync_modes:
- "append"
- "overwrite"
- dockerImage: "airbyte/destination-s3:0.2.2"
- dockerImage: "airbyte/destination-s3:0.2.3"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/s3"
connectionSpecification:
Expand Down Expand Up @@ -3594,8 +3594,6 @@
\ more memory. Allowed values: min=5MB, max=525MB Default: 5MB."
type: "integer"
default: 5
minimum: 5
maximum: 525
examples:
- 5
- title: "CSV: Comma-Separated Values"
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/destination-gcs/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-gcs

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.19
LABEL io.airbyte.version=0.1.20
LABEL io.airbyte.name=airbyte/destination-gcs
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/destination-s3/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-s3

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.version=0.2.3
LABEL io.airbyte.name=airbyte/destination-s3
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ static List<JsonSchemaType> getNonNullTypes(final String fieldName, final JsonNo
.filter(type -> type != JsonSchemaType.NULL).collect(Collectors.toList());
}

/**
* When no type is specified, it will default to string.
*/
static List<JsonSchemaType> getTypes(final String fieldName, final JsonNode fieldDefinition) {
final Optional<JsonNode> combinedRestriction = getCombinedRestriction(fieldDefinition);
if (combinedRestriction.isPresent()) {
Expand All @@ -59,7 +62,8 @@ static List<JsonSchemaType> getTypes(final String fieldName, final JsonNode fiel

final JsonNode typeProperty = fieldDefinition.get("type");
if (typeProperty == null || typeProperty.isNull()) {
throw new IllegalStateException(String.format("Field %s has no type", fieldName));
LOGGER.warn("Field \"{}\" has no type specification. It will default to string", fieldName);
return Collections.singletonList(JsonSchemaType.STRING);
}

if (typeProperty.isArray()) {
Expand All @@ -72,7 +76,8 @@ static List<JsonSchemaType> getTypes(final String fieldName, final JsonNode fiel
return Collections.singletonList(JsonSchemaType.fromJsonSchemaType(typeProperty.asText()));
}

throw new IllegalStateException("Unexpected type: " + typeProperty);
LOGGER.warn("Field \"{}\" has unexpected type {}. It will default to string.", fieldName, typeProperty);
return Collections.singletonList(JsonSchemaType.STRING);
}

static Optional<JsonNode> getCombinedRestriction(final JsonNode fieldDefinition) {
Expand Down Expand Up @@ -120,7 +125,7 @@ public Schema getAvroSchema(final JsonNode jsonSchema,
final SchemaBuilder.RecordBuilder<Schema> builder = SchemaBuilder.record(stdName);
if (!stdName.equals(fieldName)) {
standardizedNames.put(fieldName, stdName);
LOGGER.warn("Schema name contains illegal character(s) and is standardized: {} -> {}", fieldName,
LOGGER.warn("Schema name \"{}\" contains illegal character(s) and is standardized to \"{}\"", fieldName,
stdName);
builder.doc(
String.format("%s%s%s",
Expand Down Expand Up @@ -159,7 +164,7 @@ public Schema getAvroSchema(final JsonNode jsonSchema,
final SchemaBuilder.FieldBuilder<Schema> fieldBuilder = assembler.name(stdFieldName);
if (!stdFieldName.equals(subfieldName)) {
standardizedNames.put(subfieldName, stdFieldName);
LOGGER.warn("Field name contains illegal character(s) and is standardized: {} -> {}",
LOGGER.warn("Field name \"{}\" contains illegal character(s) and is standardized to \"{}\"",
subfieldName, stdFieldName);
fieldBuilder.doc(String.format("%s%s%s",
AvroConstants.DOC_KEY_ORIGINAL_NAME,
Expand Down Expand Up @@ -231,26 +236,33 @@ Schema parseSingleType(final String fieldName,
case ARRAY -> {
final JsonNode items = fieldDefinition.get("items");
if (items == null) {
LOGGER.warn("Array field {} does not specify the items type. It will be assumed to be an array of strings", fieldName);
LOGGER.warn("Array field \"{}\" does not specify the items type. It will default to an array of strings", fieldName);
fieldSchema = Schema.createArray(Schema.createUnion(NULL_SCHEMA, STRING_SCHEMA));
} else if (items.isObject()) {
fieldSchema =
Schema.createArray(
parseJsonField(String.format("%s.items", fieldName), fieldNamespace, items, appendExtraProps, addStringToLogicalTypes));
if (!items.has("type") || items.get("type").isNull()) {
LOGGER.warn("Array field \"{}\" does not specify the items type. it will default to an array of strings", fieldName);
fieldSchema = Schema.createArray(Schema.createUnion(NULL_SCHEMA, STRING_SCHEMA));
} else {
// Objects inside Json array has no names. We name it with the ".items" suffix.
final String elementFieldName = fieldName + ".items";
fieldSchema = Schema.createArray(parseJsonField(elementFieldName, fieldNamespace, items, appendExtraProps, addStringToLogicalTypes));
}
} else if (items.isArray()) {
final List<Schema> arrayElementTypes =
parseJsonTypeUnion(fieldName, fieldNamespace, (ArrayNode) items, appendExtraProps, addStringToLogicalTypes);
arrayElementTypes.add(0, NULL_SCHEMA);
fieldSchema = Schema.createArray(Schema.createUnion(arrayElementTypes));
} else {
throw new IllegalStateException(
String.format("Array field %s has invalid items property: %s", fieldName, items));
LOGGER.warn("Array field \"{}\" has invalid items specification: {}. It will default to an array of strings.", fieldName, items);
fieldSchema = Schema.createArray(Schema.createUnion(NULL_SCHEMA, STRING_SCHEMA));
}
}
case OBJECT -> fieldSchema =
getAvroSchema(fieldDefinition, fieldName, fieldNamespace, false, appendExtraProps, addStringToLogicalTypes, false);
default -> throw new IllegalStateException(
String.format("Unexpected type for field %s: %s", fieldName, fieldType));
default -> {
LOGGER.warn("Field \"{}\" has invalid type definition: {}. It will default to string.", fieldName, fieldDefinition);
fieldSchema = Schema.createUnion(NULL_SCHEMA, STRING_SCHEMA);
}
}
return fieldSchema;
}
Expand All @@ -267,7 +279,6 @@ List<Schema> parseJsonTypeUnion(final String fieldName,
final ArrayNode types,
final boolean appendExtraProps,
final boolean addStringToLogicalTypes) {
final List<JsonNode> typeList = MoreIterators.toList(types.elements());
final List<Schema> schemas = MoreIterators.toList(types.elements())
.stream()
.flatMap(definition -> getNonNullTypes(fieldName, definition).stream().flatMap(type -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1512,5 +1512,55 @@
],
"_airbyte_additional_properties": null
}
},
{
"schemaName": "array_field_with_empty_items",
"namespace": "namespace20",
"appendAirbyteFields": false,
"jsonSchema": {
"type": "object",
"properties": {
"array_field": {
"type": "array",
"items": {}
}
}
},
"jsonObject": {
"array_field": [1234, true, "false", 0.001]
},
"avroSchema": {
"type": "record",
"name": "array_field_with_empty_items",
"namespace": "namespace20",
"fields": [
{
"name": "array_field",
"type": [
"null",
{
"type": "array",
"items": ["null", "string"]
}
],
"default": null
},
{
"name": "_airbyte_additional_properties",
"type": [
"null",
{
"type": "map",
"values": "string"
}
],
"default": null
}
]
},
"avroObject": {
"array_field": ["1234", "true", "false", "0.001"],
"_airbyte_additional_properties": null
}
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@
]
},
{
"fieldName": "array_field_without_items_type",
"fieldName": "array_field_without_items",
"jsonFieldSchema": {
"type": "array"
},
Expand All @@ -212,5 +212,24 @@
"items": ["null", "string"]
}
]
},
{
"fieldName": "array_field_with_empty_items",
"jsonFieldSchema": {
"type": "array",
"items": {}
},
"avroFieldType": [
"null",
{
"type": "array",
"items": ["null", "string"]
}
]
},
{
"fieldName": "field_without_type",
"jsonFieldSchema": {},
"avroFieldType": ["null", "string"]
}
]
1 change: 1 addition & 0 deletions docs/integrations/destinations/gcs.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ Under the hood, an Airbyte data stream in Json schema is first converted to an A
| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.20 | 2022-01-11 | [\#9367](https://github.com/airbytehq/airbyte/pull/9367) | Avro & Parquet: support array field with unknown item type; default any improperly typed field to string. |
| 0.1.19 | 2022-01-10 | [\#9121](https://github.com/airbytehq/airbyte/pull/9121) | Fixed check method for GCS mode to verify if all roles assigned to user |
| 0.1.18 | 2021-12-30 | [\#8809](https://github.com/airbytehq/airbyte/pull/8809) | Update connector fields title/description |
| 0.1.17 | 2021-12-21 | [\#8574](https://github.com/airbytehq/airbyte/pull/8574) | Added namespace to Avro and Parquet record types |
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ Under the hood, an Airbyte data stream in Json schema is first converted to an A

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.2.3 | 2022-01-11 | [\#9367](https://github.com/airbytehq/airbyte/pull/9367) | Avro & Parquet: support array field with unknown item type; default any improperly typed field to string. |
| 0.2.2 | 2021-12-21 | [\#8574](https://github.com/airbytehq/airbyte/pull/8574) | Added namespace to Avro and Parquet record types |
| 0.2.1 | 2021-12-20 | [\#8974](https://github.com/airbytehq/airbyte/pull/8974) | Release a new version to ensure there is no excessive logging. |
| 0.2.0 | 2021-12-15 | [\#8607](https://github.com/airbytehq/airbyte/pull/8607) | Change the output filename for CSV files - it's now `bucketPath/namespace/streamName/timestamp_epochMillis_randomUuid.csv` |
Expand Down
8 changes: 6 additions & 2 deletions docs/understanding-airbyte/json-avro-conversion.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ This is not supported in Avro schema. As a compromise, the converter creates a u
}
```

If the Json array has multiple object items, these objects will be recursively merged into one Avro record. For example, the following Json array expects two different objects, each with a different `id` field.
If the Json array has multiple object items, these objects will be recursively merged into one Avro record. For example, the following Json array expects two different objects. The first object has an `id` field, and second has an `id` and `message` field. Their `id` fields have slightly different types.

Json schema:

Expand Down Expand Up @@ -223,7 +223,7 @@ Json object:
}
```

Furthermore, the fields under the `id` record, `id_part_1` and `id_part_2`, will also have their schemas merged.
After conversion, the two object schemas will be merged into one. Furthermore, the fields under the `id` record, `id_part_1` and `id_part_2`, will also be merged. In this way, all possible valid elements from the Json array can be converted to Avro records.

Avro schema:

Expand Down Expand Up @@ -468,6 +468,10 @@ the corresponding Avro schema and record will be:
}
```

### Untyped Field

Any field without property type specification will default to a `string` field, and its value will be serialized to string.

## Example

Based on the above rules, here is an overall example. Given the following Json schema:
Expand Down

0 comments on commit 5f6785d

Please sign in to comment.