Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐞 Destination S3 & GCS Avro: support array with unknown item type #9367

Merged
merged 14 commits into from
Jan 12, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-bigquery-denormalized

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.version=0.2.3
LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-bigquery

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.6.1
LABEL io.airbyte.version=0.6.2
LABEL io.airbyte.name=airbyte/destination-bigquery
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/destination-gcs/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-gcs

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.18
LABEL io.airbyte.version=0.1.19
LABEL io.airbyte.name=airbyte/destination-gcs
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-s3

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.version=0.2.3
LABEL io.airbyte.name=airbyte/destination-s3
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ static List<JsonSchemaType> getNonNullTypes(final String fieldName, final JsonNo
.filter(type -> type != JsonSchemaType.NULL).collect(Collectors.toList());
}

/**
* When no type is specified, it will default to string.
*/
static List<JsonSchemaType> getTypes(final String fieldName, final JsonNode fieldDefinition) {
final Optional<JsonNode> combinedRestriction = getCombinedRestriction(fieldDefinition);
if (combinedRestriction.isPresent()) {
Expand All @@ -59,7 +62,8 @@ static List<JsonSchemaType> getTypes(final String fieldName, final JsonNode fiel

final JsonNode typeProperty = fieldDefinition.get("type");
if (typeProperty == null || typeProperty.isNull()) {
throw new IllegalStateException(String.format("Field %s has no type", fieldName));
LOGGER.warn("Field \"{}\" has no type specification. It will default to string", fieldName);
return Collections.singletonList(JsonSchemaType.STRING);
}

if (typeProperty.isArray()) {
Expand All @@ -72,7 +76,8 @@ static List<JsonSchemaType> getTypes(final String fieldName, final JsonNode fiel
return Collections.singletonList(JsonSchemaType.fromJsonSchemaType(typeProperty.asText()));
}

throw new IllegalStateException("Unexpected type: " + typeProperty);
LOGGER.warn("Field \"{}\" has unexpected type {}. It will default to string.", fieldName, typeProperty);
return Collections.singletonList(JsonSchemaType.STRING);
}

static Optional<JsonNode> getCombinedRestriction(final JsonNode fieldDefinition) {
Expand Down Expand Up @@ -120,7 +125,7 @@ public Schema getAvroSchema(final JsonNode jsonSchema,
final SchemaBuilder.RecordBuilder<Schema> builder = SchemaBuilder.record(stdName);
if (!stdName.equals(fieldName)) {
standardizedNames.put(fieldName, stdName);
LOGGER.warn("Schema name contains illegal character(s) and is standardized: {} -> {}", fieldName,
LOGGER.warn("Schema name \"{}\" contains illegal character(s) and is standardized to \"{}\"", fieldName,
stdName);
builder.doc(
String.format("%s%s%s",
Expand Down Expand Up @@ -159,7 +164,7 @@ public Schema getAvroSchema(final JsonNode jsonSchema,
final SchemaBuilder.FieldBuilder<Schema> fieldBuilder = assembler.name(stdFieldName);
if (!stdFieldName.equals(subfieldName)) {
standardizedNames.put(subfieldName, stdFieldName);
LOGGER.warn("Field name contains illegal character(s) and is standardized: {} -> {}",
LOGGER.warn("Field name \"{}\" contains illegal character(s) and is standardized to \"{}\"",
subfieldName, stdFieldName);
fieldBuilder.doc(String.format("%s%s%s",
AvroConstants.DOC_KEY_ORIGINAL_NAME,
Expand Down Expand Up @@ -231,26 +236,33 @@ Schema parseSingleType(final String fieldName,
case ARRAY -> {
final JsonNode items = fieldDefinition.get("items");
if (items == null) {
LOGGER.warn("Array field {} does not specify the items type. It will be assumed to be an array of strings", fieldName);
LOGGER.warn("Array field \"{}\" does not specify the items type. It will default to an array of strings", fieldName);
fieldSchema = Schema.createArray(Schema.createUnion(NULL_SCHEMA, STRING_SCHEMA));
} else if (items.isObject()) {
fieldSchema =
Schema.createArray(
parseJsonField(String.format("%s.items", fieldName), fieldNamespace, items, appendExtraProps, addStringToLogicalTypes));
if (!items.has("type") || items.get("type").isNull()) {
LOGGER.warn("Array field \"{}\" does not specify the items type. it will default to an array of strings", fieldName);
fieldSchema = Schema.createArray(Schema.createUnion(NULL_SCHEMA, STRING_SCHEMA));
} else {
// Objects inside Json array has no names. We name it with the ".items" suffix.
final String elementFieldName = fieldName + ".items";
fieldSchema = Schema.createArray(parseJsonField(elementFieldName, fieldNamespace, items, appendExtraProps, addStringToLogicalTypes));
}
} else if (items.isArray()) {
final List<Schema> arrayElementTypes =
parseJsonTypeUnion(fieldName, fieldNamespace, (ArrayNode) items, appendExtraProps, addStringToLogicalTypes);
arrayElementTypes.add(0, NULL_SCHEMA);
fieldSchema = Schema.createArray(Schema.createUnion(arrayElementTypes));
} else {
throw new IllegalStateException(
String.format("Array field %s has invalid items property: %s", fieldName, items));
LOGGER.warn("Array field \"{}\" has invalid items specification: {}. It will default to an array of strings.", fieldName, items);
fieldSchema = Schema.createArray(Schema.createUnion(NULL_SCHEMA, STRING_SCHEMA));
}
}
case OBJECT -> fieldSchema =
getAvroSchema(fieldDefinition, fieldName, fieldNamespace, false, appendExtraProps, addStringToLogicalTypes, false);
default -> throw new IllegalStateException(
String.format("Unexpected type for field %s: %s", fieldName, fieldType));
default -> {
LOGGER.warn("Field \"{}\" has invalid type definition: {}. It will default to string.", fieldName, fieldDefinition);
fieldSchema = Schema.createUnion(NULL_SCHEMA, STRING_SCHEMA);
}
}
return fieldSchema;
}
Expand All @@ -267,7 +279,6 @@ List<Schema> parseJsonTypeUnion(final String fieldName,
final ArrayNode types,
final boolean appendExtraProps,
final boolean addStringToLogicalTypes) {
final List<JsonNode> typeList = MoreIterators.toList(types.elements());
final List<Schema> schemas = MoreIterators.toList(types.elements())
.stream()
.flatMap(definition -> getNonNullTypes(fieldName, definition).stream().flatMap(type -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1512,5 +1512,55 @@
],
"_airbyte_additional_properties": null
}
},
{
"schemaName": "array_field_with_empty_items",
"namespace": "namespace20",
"appendAirbyteFields": false,
"jsonSchema": {
"type": "object",
"properties": {
"array_field": {
"type": "array",
"items": {}
}
}
},
"jsonObject": {
"array_field": [1234, true, "false", 0.001]
},
"avroSchema": {
"type": "record",
"name": "array_field_with_empty_items",
"namespace": "namespace20",
"fields": [
{
"name": "array_field",
"type": [
"null",
{
"type": "array",
"items": ["null", "string"]
}
],
"default": null
},
{
"name": "_airbyte_additional_properties",
"type": [
"null",
{
"type": "map",
"values": "string"
}
],
"default": null
}
]
},
"avroObject": {
"array_field": ["1234", "true", "false", "0.001"],
"_airbyte_additional_properties": null
}
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@
]
},
{
"fieldName": "array_field_without_items_type",
"fieldName": "array_field_without_items",
"jsonFieldSchema": {
"type": "array"
},
Expand All @@ -212,5 +212,24 @@
"items": ["null", "string"]
}
]
},
{
"fieldName": "array_field_with_empty_items",
"jsonFieldSchema": {
"type": "array",
"items": {}
},
"avroFieldType": [
"null",
{
"type": "array",
"items": ["null", "string"]
}
]
},
{
"fieldName": "field_without_type",
"jsonFieldSchema": {},
"avroFieldType": ["null", "string"]
}
]
2 changes: 2 additions & 0 deletions docs/integrations/destinations/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ Therefore, Airbyte BigQuery destination will convert any invalid characters into

| Version | Date | Pull Request | Subject |
|:--------| :--- | :--- | :--- |
| 0.6.2 | 2022-01-09 | [\#9367](https://github.com/airbytehq/airbyte/pull/9367) | Support array field with unknown item type. Default any improperly typed field to string. |
| 0.6.1 | 2021-12-22 | [\#9039](https://github.com/airbytehq/airbyte/pull/9039) | Added part_size configuration to UI for GCS staging |
| 0.6.0 | 2021-12-17 | [\#8788](https://github.com/airbytehq/airbyte/issues/8788) | BigQuery/BiqQuery denorm Destinations : Add possibility to use different types of GCS files |
| 0.5.1 | 2021-12-16 | [\#8816](https://github.com/airbytehq/airbyte/issues/8816) | Update dataset locations |
Expand All @@ -170,6 +171,7 @@ Therefore, Airbyte BigQuery destination will convert any invalid characters into

| Version | Date | Pull Request | Subject |
|:--------| :--- | :--- | :--- |
| 0.2.3 | 2022-01-09 | [\#9367](https://github.com/airbytehq/airbyte/pull/9367) | Support array field with unknown item type. Default any improperly typed field to string. |
| 0.2.2 | 2021-12-22 | [\#9039](https://github.com/airbytehq/airbyte/pull/9039) | Added part_size configuration to UI for GCS staging |
| 0.2.1 | 2021-12-21 | [\#8574](https://github.com/airbytehq/airbyte/pull/8574) | Added namespace to Avro and Parquet record types |
| 0.2.0 | 2021-12-17 | [\#8788](https://github.com/airbytehq/airbyte/pull/8788) | BigQuery/BiqQuery denorm Destinations : Add possibility to use different types of GCS files |
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/gcs.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ Under the hood, an Airbyte data stream in Json schema is first converted to an A

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.19 | 2022-01-09 | [\#9367](https://github.com/airbytehq/airbyte/pull/9367) | Support array field with unknown item type. Default any improperly typed field to string. |
| 0.1.18 | 2021-12-30 | [\#8809](https://github.com/airbytehq/airbyte/pull/8809) | Update connector fields title/description |
| 0.1.17 | 2021-12-21 | [\#8574](https://github.com/airbytehq/airbyte/pull/8574) | Added namespace to Avro and Parquet record types |
| 0.1.16 | 2021-12-20 | [\#8974](https://github.com/airbytehq/airbyte/pull/8974) | Release a new version to ensure there is no excessive logging. |
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ Under the hood, an Airbyte data stream in Json schema is first converted to an A

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.2.3 | 2022-01-09 | [\#9367](https://github.com/airbytehq/airbyte/pull/9367) | Support array field with unknown item type. Default any improperly typed field to string. |
| 0.2.2 | 2021-12-21 | [\#8574](https://github.com/airbytehq/airbyte/pull/8574) | Added namespace to Avro and Parquet record types |
| 0.2.1 | 2021-12-20 | [\#8974](https://github.com/airbytehq/airbyte/pull/8974) | Release a new version to ensure there is no excessive logging. |
| 0.2.0 | 2021-12-15 | [\#8607](https://github.com/airbytehq/airbyte/pull/8607) | Change the output filename for CSV files - it's now `bucketPath/namespace/streamName/timestamp_epochMillis_randomUuid.csv` |
Expand Down
8 changes: 6 additions & 2 deletions docs/understanding-airbyte/json-avro-conversion.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ This is not supported in Avro schema. As a compromise, the converter creates a u
}
```

If the Json array has multiple object items, these objects will be recursively merged into one Avro record. For example, the following Json array expects two different objects, each with a different `id` field.
If the Json array has multiple object items, these objects will be recursively merged into one Avro record. For example, the following Json array expects two different objects. The first object has an `id` field, and second has an `id` and `message` field. Their `id` fields have slightly different types.

Json schema:

Expand Down Expand Up @@ -223,7 +223,7 @@ Json object:
}
```

Furthermore, the fields under the `id` record, `id_part_1` and `id_part_2`, will also have their schemas merged.
After conversion, the two object schemas will be merged into one. Furthermore, the fields under the `id` record, `id_part_1` and `id_part_2`, will also be merged. In this way, all possible valid elements from the Json array can be converted to Avro records.

Avro schema:

Expand Down Expand Up @@ -468,6 +468,10 @@ the corresponding Avro schema and record will be:
}
```

### Untyped Field

Any field without property type specification will default to a `string` field, and its value will be serialized to string.

## Example

Based on the above rules, here is an overall example. Given the following Json schema:
Expand Down