Skip to content

Commit

Permalink
🐛 📝 don't missparse +/-infinity in postgres-source (#31856)
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte authored Nov 9, 2023
1 parent 08a8757 commit 3f42418
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 63 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.4.7 | 2023-11-08 | [\#31856](https://github.com/airbytehq/airbyte/pull/31856) | source-postgres: support for inifinty date and timestamps |
| 0.4.5 | 2023-11-07 | [\#32112](https://github.com/airbytehq/airbyte/pull/32112) | Async destinations framework: Allow configuring the queue flush threshold |
| 0.4.4 | 2023-11-06 | [\#32119](https://github.com/airbytehq/airbyte/pull/32119) | Add STANDARD UUID codec to MongoDB debezium handler |
| 0.4.2 | 2023-11-06 | [\#32190](https://github.com/airbytehq/airbyte/pull/32190) | Improve error deinterpolation |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,39 @@
},
"Date": {
"type": "string",
"pattern": "^\\d{4}-\\d{2}-\\d{2}( BC)?$",
"description": "RFC 3339\u00a75.6's full-date format, extended with BC era support"
"oneOf": [
{
"pattern": "^\\d{4}-\\d{2}-\\d{2}( BC)?$"
},
{
"enum": ["Infinity", "-Infinity"]
}
],
"description": "RFC 3339\u00a75.6's full-date format, extended with BC era support and (-)Infinity"
},
"TimestampWithTimezone": {
"type": "string",
"pattern": "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(\\.\\d+)?(Z|[+\\-]\\d{1,2}:\\d{2})( BC)?$",
"description": "An instant in time. Frequently simply referred to as just a timestamp, or timestamptz. Uses RFC 3339\u00a75.6's date-time format, requiring a \"T\" separator, and extended with BC era support. Note that we do _not_ accept Unix epochs here.\n"
"oneOf": [
{
"pattern": "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(\\.\\d+)?(Z|[+\\-]\\d{1,2}:\\d{2})( BC)?$"
},
{
"enum": ["Infinity", "-Infinity"]
}
],
"description": "An instant in time. Frequently simply referred to as just a timestamp, or timestamptz. Uses RFC 3339\u00a75.6's date-time format, requiring a \"T\" separator, and extended with BC era support and (-)Infinity. Note that we do _not_ accept Unix epochs here.\n"
},
"TimestampWithoutTimezone": {
"type": "string",
"pattern": "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(\\.\\d+)?( BC)?$",
"description": "Also known as a localdatetime, or just datetime. Under RFC 3339\u00a75.6, this would be represented as `full-date \"T\" partial-time`, extended with BC era support.\n"
"oneOf": [
{
"pattern": "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(\\.\\d+)?( BC)?$"
},
{
"enum": ["Infinity", "-Infinity"]
}
],
"description": "Also known as a localdatetime, or just datetime. Under RFC 3339\u00a75.6, this would be represented as `full-date \"T\" partial-time`, extended with BC era support and (-)Infinity.\n"
},
"TimeWithTimezone": {
"type": "string",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.airbyte.cdk.db.jdbc.DateTimeConverter;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumConverterUtils;
import io.debezium.connector.postgresql.PostgresValueConverter;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import io.debezium.time.Conversions;
Expand Down Expand Up @@ -239,11 +240,13 @@ private int getTimePrecision(final RelationalColumn field) {
return field.scale().orElse(-1);
}

private final String POSITIVE_INFINITY_VALUE = "Infinity";
private final String NEGATIVE_INFINITY_VALUE = "-Infinity";

// Ref :
// https://debezium.io/documentation/reference/2.2/connectors/postgresql.html#postgresql-temporal-types
private void registerDate(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
final var fieldType = field.typeName();

registration.register(SchemaBuilder.string().optional(), x -> {
if (x == null) {
return DebeziumConverterUtils.convertDefaultValue(field);
Expand All @@ -252,8 +255,20 @@ private void registerDate(final RelationalColumn field, final ConverterRegistrat
case "TIMETZ":
return DateTimeConverter.convertToTimeWithTimezone(x);
case "TIMESTAMPTZ":
if (x.equals(PostgresValueConverter.NEGATIVE_INFINITY_OFFSET_DATE_TIME)) {
return NEGATIVE_INFINITY_VALUE;
}
if (x.equals(PostgresValueConverter.POSITIVE_INFINITY_OFFSET_DATE_TIME)) {
return POSITIVE_INFINITY_VALUE;
}
return DateTimeConverter.convertToTimestampWithTimezone(x);
case "TIMESTAMP":
if (x.equals(PostgresValueConverter.NEGATIVE_INFINITY_INSTANT)) {
return NEGATIVE_INFINITY_VALUE;
}
if (x.equals(PostgresValueConverter.POSITIVE_INFINITY_INSTANT)) {
return POSITIVE_INFINITY_VALUE;
}
if (x instanceof final Long l) {
if (getTimePrecision(field) <= 3) {
return convertToTimestamp(Conversions.toInstantFromMillis(l));
Expand All @@ -264,6 +279,12 @@ private void registerDate(final RelationalColumn field, final ConverterRegistrat
}
return convertToTimestamp(x);
case "DATE":
if (x.equals(PostgresValueConverter.NEGATIVE_INFINITY_LOCAL_DATE)) {
return NEGATIVE_INFINITY_VALUE;
}
if (x.equals(PostgresValueConverter.POSITIVE_INFINITY_LOCAL_DATE)) {
return POSITIVE_INFINITY_VALUE;
}
if (x instanceof Integer) {
return convertToDate(LocalDate.ofEpochDay((Integer) x));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ public static StateManager createStateManager(final AirbyteStateType supportedSt
switch (supportedStateType) {
case LEGACY:
LOGGER.info("Legacy state manager selected to manage state object with type {}.", airbyteStateMessage.getType());
return new LegacyStateManager(Jsons.object(airbyteStateMessage.getData(), DbState.class), catalog);
@SuppressWarnings("deprecation")
StateManager retVal = new LegacyStateManager(Jsons.object(airbyteStateMessage.getData(), DbState.class), catalog);
return retVal;
case GLOBAL:
LOGGER.info("Global state manager selected to manage state object with type {}.", airbyteStateMessage.getType());
return new GlobalStateManager(generateGlobalState(airbyteStateMessage), catalog);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ java {
}

airbyteJavaConnector {
cdkVersionRequired = '0.4.6'
cdkVersionRequired = '0.4.7'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.2.20
dockerImageTag: 3.2.21
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,19 @@
import io.airbyte.protocol.models.JsonSchemaPrimitiveUtil.JsonSchemaPrimitive;
import io.airbyte.protocol.models.JsonSchemaType;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.time.*;
import java.time.format.DateTimeParseException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.postgresql.PGStatement;
import org.postgresql.geometric.PGbox;
import org.postgresql.geometric.PGcircle;
import org.postgresql.geometric.PGline;
Expand All @@ -67,6 +65,15 @@ public class PostgresSourceOperations extends AbstractJdbcCompatibleSourceOperat
private static final Map<Integer, PostgresType> POSTGRES_TYPE_DICT = new HashMap<>();
private final Map<String, Map<String, ColumnInfo>> streamColumnInfo = new HashMap<>();

private static final String POSITIVE_INFINITY_STRING = "Infinity";
private static final String NEGATIVE_INFINITY_STRING = "-Infinity";
private static final Date POSITIVE_INFINITY_DATE = new Date(PGStatement.DATE_POSITIVE_INFINITY);
private static final Date NEGATIVE_INFINITY_DATE = new Date(PGStatement.DATE_NEGATIVE_INFINITY);
private static final Timestamp POSITIVE_INFINITY_TIMESTAMP = new Timestamp(PGStatement.DATE_POSITIVE_INFINITY);
private static final Timestamp NEGATIVE_INFINITY_TIMESTAMP = new Timestamp(PGStatement.DATE_NEGATIVE_INFINITY);
private static final OffsetDateTime POSITIVE_INFINITY_OFFSET_DATE_TIME = OffsetDateTime.MAX;
private static final OffsetDateTime NEGATIVE_INFINITY_OFFSET_DATE_TIME = OffsetDateTime.MIN;

static {
Arrays.stream(PostgresType.class.getEnumConstants()).forEach(c -> POSTGRES_TYPE_DICT.put(c.type, c));
}
Expand All @@ -92,6 +99,8 @@ public void setCursorField(final PreparedStatement preparedStatement,
final PostgresType cursorFieldType,
final String value)
throws SQLException {

LOGGER.warn("SGX setCursorField value=" + value + "cursorFieldType=" + cursorFieldType);
switch (cursorFieldType) {

case TIMESTAMP -> setTimestamp(preparedStatement, parameterIndex, value);
Expand Down Expand Up @@ -265,6 +274,10 @@ private void putTimestampArray(final ObjectNode node, final String columnName, f
final Timestamp timestamp = arrayResultSet.getTimestamp(2);
if (timestamp == null) {
arrayNode.add(NullNode.getInstance());
} else if (POSITIVE_INFINITY_TIMESTAMP.equals(timestamp)) {
arrayNode.add(POSITIVE_INFINITY_STRING);
} else if (NEGATIVE_INFINITY_TIMESTAMP.equals(timestamp)) {
arrayNode.add(NEGATIVE_INFINITY_STRING);
} else {
arrayNode.add(DateTimeConverter.convertToTimestamp(timestamp));
}
Expand All @@ -280,6 +293,10 @@ private void putTimestampTzArray(final ObjectNode node, final String columnName,
final OffsetDateTime timestamptz = getObject(arrayResultSet, 2, OffsetDateTime.class);
if (timestamptz == null) {
arrayNode.add(NullNode.getInstance());
} else if (POSITIVE_INFINITY_OFFSET_DATE_TIME.equals(timestamptz)) {
arrayNode.add(POSITIVE_INFINITY_STRING);
} else if (NEGATIVE_INFINITY_OFFSET_DATE_TIME.equals(timestamptz)) {
arrayNode.add(NEGATIVE_INFINITY_STRING);
} else {
final LocalDate localDate = timestamptz.toLocalDate();
arrayNode.add(resolveEra(localDate, timestamptz.format(TIMESTAMPTZ_FORMATTER)));
Expand All @@ -292,9 +309,13 @@ private void putDateArray(final ObjectNode node, final String columnName, final
final ArrayNode arrayNode = Jsons.arrayNode();
final ResultSet arrayResultSet = resultSet.getArray(colIndex).getResultSet();
while (arrayResultSet.next()) {
final LocalDate date = getObject(arrayResultSet, 2, LocalDate.class);
final Date date = getObject(arrayResultSet, 2, Date.class);
if (date == null) {
arrayNode.add(NullNode.getInstance());
} else if (POSITIVE_INFINITY_DATE.equals(date)) {
arrayNode.add(POSITIVE_INFINITY_STRING);
} else if (NEGATIVE_INFINITY_DATE.equals(date)) {
arrayNode.add(NEGATIVE_INFINITY_STRING);
} else {
arrayNode.add(DateTimeConverter.convertToDate(date));
}
Expand Down Expand Up @@ -391,9 +412,57 @@ private void putLongArray(final ObjectNode node, final String columnName, final
node.set(columnName, arrayNode);
}

@Override
protected void putDate(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
Date dateFromResultSet = resultSet.getDate(index);
if (POSITIVE_INFINITY_DATE.equals(dateFromResultSet)) {
node.put(columnName, POSITIVE_INFINITY_STRING);
} else if (NEGATIVE_INFINITY_DATE.equals(dateFromResultSet)) {
node.put(columnName, NEGATIVE_INFINITY_STRING);
} else {
super.putDate(node, columnName, resultSet, index);
}
}

@Override
protected void putTime(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
super.putTime(node, columnName, resultSet, index);
}

@Override
protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, DateTimeConverter.convertToTimestamp(resultSet.getTimestamp(index)));
Timestamp timestampFromResultSet = resultSet.getTimestamp(index);
String strValue = resultSet.getString(index);
if (POSITIVE_INFINITY_TIMESTAMP.equals(timestampFromResultSet)) {
node.put(columnName, POSITIVE_INFINITY_STRING);
} else if (NEGATIVE_INFINITY_TIMESTAMP.equals(timestampFromResultSet)) {
node.put(columnName, NEGATIVE_INFINITY_STRING);
} else {
node.put(columnName, DateTimeConverter.convertToTimestamp(timestampFromResultSet));
}
}

@Override
protected void putTimeWithTimezone(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
final OffsetTime timetz = getObject(resultSet, index, OffsetTime.class);
node.put(columnName, DateTimeConverter.convertToTimeWithTimezone(timetz));
}

@Override
protected void putTimestampWithTimezone(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index)
throws SQLException {
final OffsetDateTime timestampTz = getObject(resultSet, index, OffsetDateTime.class);
final String timestampTzVal;
if (POSITIVE_INFINITY_OFFSET_DATE_TIME.equals(timestampTz)) {
timestampTzVal = POSITIVE_INFINITY_STRING;
} else if (NEGATIVE_INFINITY_OFFSET_DATE_TIME.equals(timestampTz)) {
timestampTzVal = NEGATIVE_INFINITY_STRING;
} else {
final LocalDate localDate = timestampTz.toLocalDate();
timestampTzVal = resolveEra(localDate, timestampTz.format(TIMESTAMPTZ_FORMATTER));
}

node.put(columnName, timestampTzVal);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ protected void initTests() {
.sourceType("date")
.fullSourceDataType(type)
.airbyteType(JsonSchemaType.STRING_DATE)
.addInsertValues("'1999-01-08'", "'1991-02-10 BC'", "'2022/11/12'", "'1987.12.01'")
.addExpectedValues("1999-01-08", "1991-02-10 BC", "2022-11-12", "1987-12-01")
.addInsertValues("'1999-01-08'", "'1991-02-10 BC'", "'2022/11/12'", "'1987.12.01'", "'-InFinITy'", "'InFinITy'")
.addExpectedValues("1999-01-08", "1991-02-10 BC", "2022-11-12", "1987-12-01", "-Infinity", "Infinity")
.build());
}

Expand Down Expand Up @@ -460,14 +460,16 @@ protected void initTests() {
"TIMESTAMP '0001-01-01 00:00:00.000000'",
// The last possible timestamp in BCE
"TIMESTAMP '0001-12-31 23:59:59.999999 BC'",
"'epoch'")
"'epoch'",
"'-InFinITy'", "'InFinITy'")
.addExpectedValues(
"2004-10-19T10:23:00.000000",
"2004-10-19T10:23:54.123456",
"3004-10-19T10:23:54.123456 BC",
"0001-01-01T00:00:00.000000",
"0001-12-31T23:59:59.999999 BC",
"1970-01-01T00:00:00.000000")
"1970-01-01T00:00:00.000000",
"-Infinity", "Infinity")
.build());
}

Expand All @@ -483,8 +485,6 @@ protected void initTests() {
.build());
}

addTimestampWithInfinityValuesTest();

// timestamp with time zone
for (final String fullSourceType : Set.of("timestamptz", "timestamp with time zone")) {
addDataTypeTestData(
Expand All @@ -503,14 +503,14 @@ protected void initTests() {
"TIMESTAMP WITH TIME ZONE '0001-12-31 16:00:00.000000-08 BC'",
// The last possible timestamp in BCE (15:59-08 == 23:59Z)
"TIMESTAMP WITH TIME ZONE '0001-12-31 15:59:59.999999-08 BC'",
"null")
"null", "'-InFinITy'", "'InFinITy'")
.addExpectedValues(
"2004-10-19T18:23:00.000000Z",
"2004-10-19T18:23:54.123456Z",
"3004-10-19T18:23:54.123456Z BC",
"0001-01-01T00:00:00.000000Z",
"0001-12-31T23:59:59.999999Z BC",
null)
null, "-Infinity", "Infinity")
.build());
}

Expand Down Expand Up @@ -640,24 +640,6 @@ protected void addTimeWithTimeZoneTest() {
}
}

protected void addTimestampWithInfinityValuesTest() {
// timestamp without time zone
for (final String fullSourceType : Set.of("timestamp", "timestamp without time zone", "timestamp without time zone not null default now()")) {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("timestamp")
.fullSourceDataType(fullSourceType)
.airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE)
.addInsertValues(
"'infinity'",
"'-infinity'")
.addExpectedValues(
"+292278994-08-16T23:00:00.000000",
"+292269055-12-02T23:00:00.000000 BC")
.build());
}
}

private void addArraysTestData() {
addDataTypeTestData(
TestDataHolder.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,25 +175,6 @@ protected void addTimeWithTimeZoneTest() {
}
}

@Override
protected void addTimestampWithInfinityValuesTest() {
// timestamp without time zone
for (final String fullSourceType : Set.of("timestamp", "timestamp without time zone", "timestamp without time zone not null default now()")) {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("timestamp")
.fullSourceDataType(fullSourceType)
.airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE)
.addInsertValues(
"'infinity'",
"'-infinity'")
.addExpectedValues(
"+294247-01-10T04:00:25.200000",
"+290309-12-21T19:59:27.600000 BC")
.build());
}
}

@Override
protected void addNumericValuesTest() {
addDataTypeTestData(
Expand Down
Loading

0 comments on commit 3f42418

Please sign in to comment.