Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Destination S3 Support writing timestamps #7732

Merged
merged 17 commits into from
Nov 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"destinationDefinitionId": "4816b78f-1489-44c1-9060-4b19d5fa9362",
"name": "S3",
"dockerRepository": "airbyte/destination-s3",
"dockerImageTag": "0.1.13",
"dockerImageTag": "0.1.14",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/s3",
"icon": "s3.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@
- name: S3
destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerRepository: airbyte/destination-s3
dockerImageTag: 0.1.13
dockerImageTag: 0.1.14
documentationUrl: https://docs.airbyte.io/integrations/destinations/s3
icon: s3.svg
- name: SFTP-JSON
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.13
LABEL io.airbyte.version=0.1.14
LABEL io.airbyte.name=airbyte/destination-s3
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class JsonToAvroSchemaConverter {
private static final Schema UUID_SCHEMA = LogicalTypes.uuid()
.addToSchema(Schema.create(Schema.Type.STRING));
private static final Schema NULL_SCHEMA = Schema.create(Schema.Type.NULL);
private static final Schema STRING_SCHEMA = Schema.create(Schema.Type.STRING);
private static final Logger LOGGER = LoggerFactory.getLogger(JsonToAvroSchemaConverter.class);
private static final Schema TIMESTAMP_MILLIS_SCHEMA = LogicalTypes.timestampMillis()
.addToSchema(Schema.create(Schema.Type.LONG));
Expand Down Expand Up @@ -172,7 +173,20 @@ Schema getSingleFieldType(final String fieldName, final JsonSchemaType fieldType

final Schema fieldSchema;
switch (fieldType) {
case STRING, NUMBER, INTEGER, BOOLEAN -> fieldSchema = Schema.create(fieldType.getAvroType());
case NUMBER, INTEGER, BOOLEAN -> fieldSchema = Schema.create(fieldType.getAvroType());
case STRING -> {
if (fieldDefinition.has("format")) {
String format = fieldDefinition.get("format").asText();
fieldSchema = switch (format) {
case "date-time" -> LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
case "date" -> LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
case "time" -> LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG));
default -> Schema.create(fieldType.getAvroType());
};
} else {
fieldSchema = Schema.create(fieldType.getAvroType());
}
}
case COMBINED -> {
final Optional<JsonNode> combinedRestriction = getCombinedRestriction(fieldDefinition);
final List<Schema> unionTypes = getSchemasFromTypes(fieldName, (ArrayNode) combinedRestriction.get());
Expand Down Expand Up @@ -240,6 +254,14 @@ Schema getNullableFieldTypes(final String fieldName, final JsonNode fieldDefinit
if (!nonNullFieldTypes.contains(NULL_SCHEMA)) {
nonNullFieldTypes.add(0, NULL_SCHEMA);
}
// Logical types are converted to a union of logical type itself and string. The purpose is to
// default the logical type field to a string, if the value of the logical type field is invalid and
// cannot be properly processed.
if ((nonNullFieldTypes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comments before this if block to explain why the string schema is added at the end?

Something like:

Logical types are converted to a union of logical type itself and string. The purpose is to default the logical type field to a string, if the value of the logical type field is invalid and cannot be properly processed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

.stream().anyMatch(schema -> schema.getLogicalType() != null)) &&
(!nonNullFieldTypes.contains(STRING_SCHEMA))) {
nonNullFieldTypes.add(STRING_SCHEMA);
}
return Schema.createUnion(nonNullFieldTypes);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,12 @@
"fields": [
{
"name": "created_at",
"type": ["null", "string", "int"],
"type": [
"null",
{ "type": "long", "logicalType": "timestamp-micros" },
"string",
"int"
],
"default": null
},
{
Expand Down Expand Up @@ -451,7 +456,12 @@
"fields": [
{
"name": "created_at",
"type": ["null", "string", "int"],
"type": [
"null",
{ "type": "long", "logicalType": "timestamp-micros" },
"string",
"int"
],
"default": null
},
{
Expand All @@ -473,7 +483,7 @@
},
"avroObject": {
"user": {
"created_at": "1634982000",
"created_at": 1634982000,
"_airbyte_additional_properties": null
},
"_airbyte_additional_properties": null
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ Under the hood, an Airbyte data stream in Json schema is first converted to an A

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.14 | 2021-11-09 | [\#7732](https://github.com/airbytehq/airbyte/pull/7732) | Support timestamp in Avro and Parquet |
| 0.1.13 | 2021-11-03 | [\#7288](https://github.com/airbytehq/airbyte/issues/7288) | Support Json `additionalProperties`. |
| 0.1.12 | 2021-09-13 | [\#5720](https://github.com/airbytehq/airbyte/issues/5720) | Added configurable block size for stream. Each stream is limited to 10,000 by S3 |
| 0.1.11 | 2021-09-10 | [\#5729](https://github.com/airbytehq/airbyte/pull/5729) | For field names that start with a digit, a `_` will be appended at the beginning for the`Parquet` and `Avro` formats. |
Expand Down