Skip to content

Commit

Permalink
Base Java S3: Update Avro TimeWithTimezone schema mapping (#21909)
Browse files Browse the repository at this point in the history
* 21908 Base Java S3: Update Avro TimeWithTimezone schema mapping

* 21908 Base Java S3: Formatting

* 21908 Base Java S3: fix integration test gcs + S3

* 21908 Base Java S3: fix unit test

* 21908 Base Java S3: fix format
  • Loading branch information
suhomud authored Jan 30, 2023
1 parent f981668 commit 2226a2e
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public enum JsonSchemaType {
DATE_V1("WellKnownTypes.json#/definitions/Date", Schema.Type.INT),
TIMESTAMP_WITH_TIMEZONE_V1("WellKnownTypes.json#/definitions/TimestampWithTimezone", Schema.Type.LONG),
TIMESTAMP_WITHOUT_TIMEZONE_V1("WellKnownTypes.json#/definitions/TimestampWithoutTimezone", Schema.Type.LONG),
TIME_WITH_TIMEZONE_V1("WellKnownTypes.json#/definitions/TimeWithTimezone", Schema.Type.LONG),
TIME_WITH_TIMEZONE_V1("WellKnownTypes.json#/definitions/TimeWithTimezone", Schema.Type.STRING),
TIME_WITHOUT_TIMEZONE_V1("WellKnownTypes.json#/definitions/TimeWithoutTimezone", Schema.Type.LONG),
OBJECT("object", Schema.Type.RECORD),
ARRAY("array", Schema.Type.ARRAY),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,12 @@ Schema parseSingleType(final String fieldName,

final Schema fieldSchema;
switch (fieldType) {
case INTEGER_V1, NUMBER_V1, BOOLEAN_V1, STRING_V1, BINARY_DATA_V1 -> fieldSchema = Schema.create(fieldType.getAvroType());
case INTEGER_V1, NUMBER_V1, BOOLEAN_V1, STRING_V1, TIME_WITH_TIMEZONE_V1, BINARY_DATA_V1 -> fieldSchema =
Schema.create(fieldType.getAvroType());
case DATE_V1 -> fieldSchema = LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
case TIMESTAMP_WITH_TIMEZONE_V1, TIMESTAMP_WITHOUT_TIMEZONE_V1 -> fieldSchema = LogicalTypes.timestampMicros()
.addToSchema(Schema.create(Schema.Type.LONG));
case TIME_WITH_TIMEZONE_V1, TIME_WITHOUT_TIMEZONE_V1 -> fieldSchema = LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG));
case TIME_WITHOUT_TIMEZONE_V1 -> fieldSchema = LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG));
case INTEGER_V0, NUMBER_V0, NUMBER_INT_V0, NUMBER_BIGINT_V0, NUMBER_FLOAT_V0, BOOLEAN_V0 -> fieldSchema =
Schema.create(fieldType.getAvroType());
case STRING_V0 -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,7 @@
"jsonFieldSchema": {
"$ref": "WellKnownTypes.json#/definitions/TimeWithTimezone"
},
"avroFieldType": [
"null",
{ "type": "long", "logicalType": "time-micros" },
"string"
]
"avroFieldType": ["null", "string"]
},
{
"fieldName": "array_field_without_items",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,10 @@ protected boolean compareJsonNodes(final JsonNode expectedValue, final JsonNode
return compareDateTimeValues(expectedValue.asText(), actualValue.asText());
} else if (isDateValue(expectedValue.asText())) {
return compareDateValues(expectedValue.asText(), actualValue.asText());
} else if (isTimeWithTimezone(expectedValue.asText()) || isTimeWithoutTimezone(expectedValue.asText())) {
return compareTime(expectedValue.asText(), actualValue.asText());
} else if (isTimeWithTimezone(expectedValue.asText())) {
return compareTimeWithTimeZone(expectedValue.asText(), actualValue.asText());
} else if (isTimeWithoutTimezone(expectedValue.asText())) {
return compareTimeWithoutTimeZone(expectedValue.asText(), actualValue.asText());
} else if (expectedValue.isArray()) {
return compareArrays(expectedValue, actualValue);
} else if (expectedValue.isObject()) {
Expand Down Expand Up @@ -215,7 +217,11 @@ protected boolean compareDateValues(final String airbyteMessageValue, final Stri
return compareTextValues(airbyteMessageValue, destinationValue);
}

protected boolean compareTime(final String airbyteMessageValue, final String destinationValue) {
protected boolean compareTimeWithoutTimeZone(final String airbyteMessageValue, final String destinationValue) {
return compareTextValues(airbyteMessageValue, destinationValue);
}

protected boolean compareTimeWithTimeZone(final String airbyteMessageValue, final String destinationValue) {
return compareTextValues(airbyteMessageValue, destinationValue);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ protected boolean compareDateTimeValues(String airbyteMessageValue, String desti
}

@Override
protected boolean compareTime(final String airbyteMessageValue, final String destinationValue) {
protected boolean compareTimeWithoutTimeZone(final String airbyteMessageValue, final String destinationValue) {
LocalTime destinationDate = LocalTime.ofInstant(getInstantFromEpoch(destinationValue), ZoneOffset.UTC);
LocalTime expectedDate = LocalTime.parse(airbyteMessageValue, DateTimeFormatter.ISO_TIME);
return expectedDate.equals(destinationDate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ protected boolean compareDateTimeValues(String airbyteMessageValue, String desti
}

@Override
protected boolean compareTime(final String airbyteMessageValue, final String destinationValue) {
protected boolean compareTimeWithoutTimeZone(final String airbyteMessageValue, final String destinationValue) {
var destinationDate = LocalTime.ofInstant(getInstantFromEpoch(destinationValue), ZoneOffset.UTC);
var expectedDate = LocalTime.parse(airbyteMessageValue, DateTimeFormatter.ISO_TIME);
return expectedDate.equals(destinationDate);
Expand Down

0 comments on commit 2226a2e

Please sign in to comment.