Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Postgres: Handle Arrays data types #16990

Merged
merged 26 commits into from
Nov 28, 2022
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
3379208
Source Postgres: Handle Arrays data types
VitaliiMaltsev Sep 21, 2022
8e3c18a
bump version
VitaliiMaltsev Sep 27, 2022
7a12915
Merge branch 'master' into vmaltsev/15157-postgres-handle-array-types
VitaliiMaltsev Sep 30, 2022
e45ca3f
updated items mapping
VitaliiMaltsev Oct 3, 2022
8feb061
updated mapping of common types
VitaliiMaltsev Oct 7, 2022
4ef8894
add datatype tests
VitaliiMaltsev Oct 10, 2022
4cec767
Merge branch 'master' into vmaltsev/15157-postgres-handle-array-types
VitaliiMaltsev Oct 10, 2022
2f9ca39
removed redundant variables
VitaliiMaltsev Oct 10, 2022
d588ead
Fixed getSpec schema test
etsybaev Oct 10, 2022
3e418a0
removed redundant variables
VitaliiMaltsev Oct 10, 2022
3f0fbd7
Merge remote-tracking branch 'origin/vmaltsev/15157-postgres-handle-a…
VitaliiMaltsev Oct 10, 2022
afea462
refactoring
VitaliiMaltsev Oct 10, 2022
b577bf6
updated json schema mappings
VitaliiMaltsev Oct 12, 2022
ed915bc
updated debezium converters
VitaliiMaltsev Oct 17, 2022
1e74d51
Merge branch 'master' into vmaltsev/15157-postgres-handle-array-types
VitaliiMaltsev Oct 17, 2022
135081a
removed unused logging
VitaliiMaltsev Oct 17, 2022
154e884
refactoring
VitaliiMaltsev Oct 17, 2022
2d5e680
refactoring and added comments
VitaliiMaltsev Oct 27, 2022
80785cc
Merge branch 'master' into vmaltsev/15157-postgres-handle-array-types
VitaliiMaltsev Oct 27, 2022
86c48be
fixed checkstyle
VitaliiMaltsev Oct 27, 2022
f6f3749
refactoring
VitaliiMaltsev Nov 8, 2022
9e0a2a9
Merge branch 'master' into vmaltsev/15157-postgres-handle-array-types
VitaliiMaltsev Nov 22, 2022
7d65724
Merge branch 'master' into vmaltsev/15157-postgres-handle-array-types
VitaliiMaltsev Nov 28, 2022
2a26f7b
Merge remote-tracking branch 'origin/vmaltsev/15157-postgres-handle-a…
VitaliiMaltsev Nov 28, 2022
b0c2fc5
bump version
VitaliiMaltsev Nov 28, 2022
6d37e38
auto-bump connector version
octavia-squidington-iii Nov 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,38 @@

package io.airbyte.integrations.debezium.internals;

import static io.airbyte.db.DataTypeUtils.TIMETZ_FORMATTER;
import static io.airbyte.db.jdbc.DateTimeConverter.convertToDate;
import static io.airbyte.db.jdbc.DateTimeConverter.convertToTime;
import static io.airbyte.db.jdbc.DateTimeConverter.convertToTimestamp;
import static io.airbyte.db.jdbc.DateTimeConverter.convertToTimestampWithTimezone;
import static org.apache.kafka.connect.data.Schema.OPTIONAL_BOOLEAN_SCHEMA;
import static org.apache.kafka.connect.data.Schema.OPTIONAL_FLOAT64_SCHEMA;
import static org.apache.kafka.connect.data.Schema.OPTIONAL_STRING_SCHEMA;

import io.airbyte.db.jdbc.DateTimeConverter;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import io.debezium.time.Conversions;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.OffsetTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.codec.binary.Hex;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.postgresql.jdbc.PgArray;
import org.postgresql.util.PGInterval;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -33,6 +51,7 @@ public class PostgresConverter implements CustomConverter<SchemaBuilder, Relatio
private final String[] TEXT_TYPES =
{"VARCHAR", "VARBINARY", "BLOB", "TEXT", "LONGTEXT", "TINYTEXT", "MEDIUMTEXT", "INVENTORY_ITEM", "TSVECTOR", "TSQUERY", "PG_LSN"};
private final String[] NUMERIC_TYPES = {"NUMERIC", "DECIMAL"};
private final String[] ARRAY_TYPES = {"_NAME", "_NUMERIC", "_BYTEA", "_MONEY", "_BIT", "_DATE", "_TIME", "_TIMETZ", "_TIMESTAMP", "_TIMESTAMPTZ"};
private final String BYTEA_TYPE = "BYTEA";

@Override
Expand All @@ -52,9 +71,22 @@ public void converterFor(final RelationalColumn field, final ConverterRegistrati
registerBytea(field, registration);
} else if (Arrays.stream(NUMERIC_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
registerNumber(field, registration);
} else if (Arrays.stream(ARRAY_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
registerArray(field, registration);
}
}

private void registerArray(RelationalColumn field, ConverterRegistration<SchemaBuilder> registration) {
final String fieldType = field.typeName().toUpperCase();
final SchemaBuilder arraySchema = switch (fieldType) {
case "_NUMERIC", "_MONEY" -> SchemaBuilder.array(OPTIONAL_FLOAT64_SCHEMA);
case "_NAME", "_DATE", "_TIME", "_TIMESTAMP", "_TIMESTAMPTZ", "_TIMETZ", "_BYTEA" -> SchemaBuilder.array(OPTIONAL_STRING_SCHEMA);
case "_BIT" -> SchemaBuilder.array(OPTIONAL_BOOLEAN_SCHEMA);
default -> SchemaBuilder.array(OPTIONAL_STRING_SCHEMA);
};
registration.register(arraySchema, x -> convertArray(x, field));
}

private void registerNumber(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string().optional(), x -> {
if (x == null) {
Expand Down Expand Up @@ -106,6 +138,72 @@ private void registerText(final RelationalColumn field, final ConverterRegistrat
});
}

private Object convertArray(Object x, RelationalColumn field) {
final String fieldType = field.typeName().toUpperCase();
Object[] values = new Object[0];
try {
values = (Object[]) ((PgArray) x).getArray();
} catch (SQLException e) {
LOGGER.error("Failed to convert PgArray:" + e);
}
switch (fieldType) {
// debezium currently cannot handle MONEY[] datatype and it's not implemented
case "_MONEY":
// PgArray.getArray() trying to convert to Double instead of PgMoney
// due to incorrect type mapping in the postgres driver
// https://github.com/pgjdbc/pgjdbc/blob/d5ed52ef391670e83ae5265af2f7301c615ce4ca/pgjdbc/src/main/java/org/postgresql/jdbc/TypeInfoCache.java#L88
// and throws an exception, so a custom implementation of converting to String is used to get the
// value as is
final String nativeMoneyValue = ((PgArray) x).toString();
final String substringM = Objects.requireNonNull(nativeMoneyValue).substring(1, nativeMoneyValue.length() - 1);
final char currency = substringM.charAt(0);
final String regex = "\\" + currency;
final List<String> myListM = new ArrayList<>(Arrays.asList(substringM.split(regex)));
return myListM.stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

am not sure I understand why we are doing this replace all, can you add a comment explaining why this is required.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

am not sure I understand why we are doing this replace all, can you add a comment explaining why this is required.

conversion from pgarray happens incorrectly and the Failed to convert PgArray:org.postgresql.util.PSQLException: Bad value for type double : $999.99" exception is thrown, so for money array I made a tricky conversion
Data in pgarray looks something like this {$999.99,"$1,001.01","$45,000.00",$1.00,$800.00,"$22,222.01","$1,001.01"}
In order to correctly convert them to double, I first split by the currency sign, and then remove all unnecessary characters (commas, quotes, etc.), except for numbers and dots, and as a result we get the correct double array
{999.99,1001.01,45000.00,1.00,800.00,22222.01,1001.01}

// since the separator is the currency sign, all extra characters must be removed except for numbers
// and dots
.map(val -> val.replaceAll("[^\\d.]", ""))
.filter(money -> !money.isEmpty())
.map(Double::valueOf)
.collect(Collectors.toList());
case "_NUMERIC":
return Arrays.stream(values).map(value -> value == null ? null : Double.valueOf(value.toString())).collect(Collectors.toList());
case "_TIME":
return Arrays.stream(values).map(value -> value == null ? null : convertToTime(value)).collect(Collectors.toList());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! I like it when we are able to reuse code

case "_DATE":
return Arrays.stream(values).map(value -> value == null ? null : convertToDate(value)).collect(Collectors.toList());
case "_TIMESTAMP":
return Arrays.stream(values).map(value -> value == null ? null : convertToTimestamp(value)).collect(Collectors.toList());
case "_TIMESTAMPTZ":
return Arrays.stream(values).map(value -> value == null ? null : convertToTimestampWithTimezone(value)).collect(Collectors.toList());
case "_TIMETZ":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why cant we use the convertToTimeWithTimezone method just like we are doing for the rest of the types?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why cant we use the convertToTimeWithTimezone method just like we are doing for the rest of the types?

debezium handles timetz and timetz [] differently
pgarray returns an array of java.sql.Time and not java.time.OffsetTime while the timezone that uses the customer is lost, so for timetz [] I use a custom implementation to transfer data without losing the timezone

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add this logic to method convertToTimeWithTimezone in class DateTimeConverter and add a condition like

if (time instanceof final java.sql.Time timetz) then do what you are doing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add this logic to method convertToTimeWithTimezone in class DateTimeConverter and add a condition like

if (time instanceof final java.sql.Time timetz) then do what you are doing

@subodh1810 the approach of converting to java.sql.Time is not entirely correct, because in this case we lose information about the time zone. That's why I refused to use it.
In my implementation, just java.lang.String is used and we shouldn't add this implementation in DateTimeConverter.convertToTimeWithTimezone() as it only applies to Debezium and Postgres array processing


final List<String> timetzArr = new ArrayList<>();
final String nativeValue = ((PgArray) x).toString();
final String substring = Objects.requireNonNull(nativeValue).substring(1, nativeValue.length() - 1);
final List<String> times = new ArrayList<>(Arrays.asList(substring.split(",")));
final DateTimeFormatter format = DateTimeFormatter.ofPattern("HH:mm:ss[.SSSSSS]X");

times.forEach(s -> {
if (s.equalsIgnoreCase("NULL")) {
timetzArr.add(null);
} else {
final OffsetTime parsed = OffsetTime.parse(s, format);
timetzArr.add(parsed.format(TIMETZ_FORMATTER));
}
});
return timetzArr;
case "_BYTEA":
return Arrays.stream(values).map(value -> Base64.getEncoder().encodeToString((byte[]) value)).collect(Collectors.toList());
case "_BIT":
return Arrays.stream(values).map(value -> (Boolean) value).collect(Collectors.toList());
case "_NAME":
return Arrays.stream(values).map(value -> (String) value).collect(Collectors.toList());
default:
return new ArrayList<>();
}
}

private int getTimePrecision(final RelationalColumn field) {
return field.scale().orElse(-1);
}
Expand All @@ -127,30 +225,20 @@ private void registerDate(final RelationalColumn field, final ConverterRegistrat
case "TIMESTAMP":
if (x instanceof final Long l) {
if (getTimePrecision(field) <= 3) {
return DateTimeConverter.convertToTimestamp(Conversions.toInstantFromMillis(l));
return convertToTimestamp(Conversions.toInstantFromMillis(l));
}
if (getTimePrecision(field) <= 6) {
return DateTimeConverter.convertToTimestamp(Conversions.toInstantFromMicros(l));
return convertToTimestamp(Conversions.toInstantFromMicros(l));
}
}
return DateTimeConverter.convertToTimestamp(x);
return convertToTimestamp(x);
case "DATE":
if (x instanceof Integer) {
return DateTimeConverter.convertToDate(LocalDate.ofEpochDay((Integer) x));
return convertToDate(LocalDate.ofEpochDay((Integer) x));
}
return DateTimeConverter.convertToDate(x);
return convertToDate(x);
case "TIME":
if (x instanceof Long) {
if (getTimePrecision(field) <= 3) {
long l = Math.multiplyExact((Long) x, TimeUnit.MILLISECONDS.toNanos(1));
return DateTimeConverter.convertToTime(LocalTime.ofNanoOfDay(l));
}
if (getTimePrecision(field) <= 6) {
long l = Math.multiplyExact((Long) x, TimeUnit.MICROSECONDS.toNanos(1));
return DateTimeConverter.convertToTime(LocalTime.ofNanoOfDay(l));
}
}
return DateTimeConverter.convertToTime(x);
return resolveTime(field, x);
case "INTERVAL":
return convertInterval((PGInterval) x);
default:
Expand All @@ -159,6 +247,20 @@ private void registerDate(final RelationalColumn field, final ConverterRegistrat
});
}

private String resolveTime(RelationalColumn field, Object x) {
if (x instanceof Long) {
if (getTimePrecision(field) <= 3) {
long l = Math.multiplyExact((Long) x, TimeUnit.MILLISECONDS.toNanos(1));
return DateTimeConverter.convertToTime(LocalTime.ofNanoOfDay(l));
}
if (getTimePrecision(field) <= 6) {
long l = Math.multiplyExact((Long) x, TimeUnit.MICROSECONDS.toNanos(1));
return DateTimeConverter.convertToTime(LocalTime.ofNanoOfDay(l));
}
}
return DateTimeConverter.convertToTime(x);
}

private String convertInterval(final PGInterval pgInterval) {
final StringBuilder resultInterval = new StringBuilder();
formatDateUnit(resultInterval, pgInterval.getYears(), " year ");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void testDataTypes() throws Exception {
testDataHolders.forEach(testDataHolder -> {
if (testCatalog()) {
final AirbyteStream airbyteStream = streams.get(testDataHolder.getNameWithTestPrefix());
final Map<String, String> jsonSchemaTypeMap = (Map<String, String>) Jsons.deserialize(
final Map<String, Object> jsonSchemaTypeMap = (Map<String, Object>) Jsons.deserialize(
airbyteStream.getJsonSchema().get("properties").get(getTestColumnName()).toString(), Map.class);
assertEquals(testDataHolder.getAirbyteType().getJsonSchemaTypeMap(), jsonSchemaTypeMap,
"Expected column type for " + testDataHolder.getNameWithTestPrefix());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.JDBCType;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.Duration;
Expand All @@ -73,7 +72,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresSource extends AbstractJdbcSource<JDBCType> implements Source {
public class PostgresSource extends AbstractJdbcSource<PostgresType> implements Source {

private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSource.class);
private static final int INTERMEDIATE_STATE_EMISSION_FREQUENCY = 10_000;
Expand Down Expand Up @@ -211,8 +210,8 @@ public AirbyteCatalog discover(final JsonNode config) throws Exception {
}

@Override
public List<TableInfo<CommonField<JDBCType>>> discoverInternal(final JdbcDatabase database) throws Exception {
final List<TableInfo<CommonField<JDBCType>>> rawTables = discoverRawTables(database);
public List<TableInfo<CommonField<PostgresType>>> discoverInternal(final JdbcDatabase database) throws Exception {
final List<TableInfo<CommonField<PostgresType>>> rawTables = discoverRawTables(database);
final Set<AirbyteStreamNameNamespacePair> publicizedTablesInCdc = PostgresCdcCatalogHelper.getPublicizedTables(database);

if (publicizedTablesInCdc.isEmpty()) {
Expand All @@ -224,15 +223,15 @@ public List<TableInfo<CommonField<JDBCType>>> discoverInternal(final JdbcDatabas
.collect(toList());
}

public List<TableInfo<CommonField<JDBCType>>> discoverRawTables(final JdbcDatabase database) throws Exception {
public List<TableInfo<CommonField<PostgresType>>> discoverRawTables(final JdbcDatabase database) throws Exception {
if (schemas != null && !schemas.isEmpty()) {
// process explicitly selected (from UI) schemas
final List<TableInfo<CommonField<JDBCType>>> internals = new ArrayList<>();
final List<TableInfo<CommonField<PostgresType>>> internals = new ArrayList<>();
for (final String schema : schemas) {
LOGGER.info("Checking schema: {}", schema);
final List<TableInfo<CommonField<JDBCType>>> tables = super.discoverInternal(database, schema);
final List<TableInfo<CommonField<PostgresType>>> tables = super.discoverInternal(database, schema);
internals.addAll(tables);
for (final TableInfo<CommonField<JDBCType>> table : tables) {
for (final TableInfo<CommonField<PostgresType>> table : tables) {
LOGGER.info("Found table: {}.{}", table.getNameSpace(), table.getName());
}
}
Expand Down Expand Up @@ -322,7 +321,7 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
@Override
public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final JdbcDatabase database,
final ConfiguredAirbyteCatalog catalog,
final Map<String, TableInfo<CommonField<JDBCType>>> tableNameToTable,
final Map<String, TableInfo<CommonField<PostgresType>>> tableNameToTable,
final StateManager stateManager,
final Instant emittedAt) {
final JsonNode sourceConfig = database.getSourceConfig();
Expand Down Expand Up @@ -471,14 +470,14 @@ public static void main(final String[] args) throws Exception {
@Override
public AirbyteConnectionStatus check(final JsonNode config) throws Exception {
if (PostgresUtils.isCdc(config)) {
if (config.has(SSL_MODE) && config.get(SSL_MODE).has(MODE)){
if (config.has(SSL_MODE) && config.get(SSL_MODE).has(MODE)) {
String sslModeValue = config.get(SSL_MODE).get(MODE).asText();
if (INVALID_CDC_SSL_MODES.contains(sslModeValue)) {
return new AirbyteConnectionStatus()
.withStatus(Status.FAILED)
.withMessage(String.format(
"In CDC replication mode ssl value '%s' is invalid. Please use one of the following SSL modes: disable, require, verify-ca, verify-full",
sslModeValue));
.withStatus(Status.FAILED)
.withMessage(String.format(
"In CDC replication mode ssl value '%s' is invalid. Please use one of the following SSL modes: disable, require, verify-ca, verify-full",
sslModeValue));
}
}
}
Expand Down
Loading