Skip to content

Commit

Permalink
[source-postgres + CDK] use BigInteger instead of BigInt (#48482)
Browse files Browse the repository at this point in the history
  • Loading branch information
theyueli authored Nov 13, 2024
1 parent 806f810 commit ffde35c
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 89 deletions.
17 changes: 9 additions & 8 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,15 +174,16 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:-----------|:-----------|:------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.48.0 | 2024-10-23 | [\#46302](https://github.com/airbytehq/airbyte/pull/46302) | Add support for file transfer |
| 0.47.3 | 2024-10-23 | [\#46689](https://github.com/airbytehq/airbyte/pull/46689) | Split DestinationAcceptanceTest|
| 0.47.2 | 2024-10-21 | [\#47216](https://github.com/airbytehq/airbyte/pull/47216) | improve java compatibiilty|
| 0.47.1 | 2024-09-27 | [\#45397](https://github.com/airbytehq/airbyte/pull/45397) | Allow logical replication from Postgres 16 read-replicas|
| 0.48.1 | 2024-11-13 | [\#48482](https://github.com/airbytehq/airbyte/pull/48482) | Adding support converting very large numbers via BigInteger l |
| 0.48.0 | 2024-10-23 | [\#46302](https://github.com/airbytehq/airbyte/pull/46302) | Add support for file transfer |
| 0.47.3 | 2024-10-23 | [\#46689](https://github.com/airbytehq/airbyte/pull/46689) | Split DestinationAcceptanceTest |
| 0.47.2 | 2024-10-21 | [\#47216](https://github.com/airbytehq/airbyte/pull/47216) | improve java compatibiilty |
| 0.47.1 | 2024-09-27 | [\#45397](https://github.com/airbytehq/airbyte/pull/45397) | Allow logical replication from Postgres 16 read-replicas |
| 0.47.0 | 2024-09-26 | [\#42030](https://github.com/airbytehq/airbyte/pull/42030) | minor refactor |
| 0.46.1 | 2024-09-20 | [\#45700](https://github.com/airbytehq/airbyte/pull/45700) | Destinations: Fix bug in parsing jsonschema |
| 0.46.0 | 2024-09-18 | [\#45432](https://github.com/airbytehq/airbyte/pull/45432) | upgrade all libraries to latest version |
| 0.45.1 | 2024-09-17 | [\#45638](https://github.com/airbytehq/airbyte/pull/45638) | upgrade apache mina sshd to 2.13.2 to handle openssh tcpkeepalive requests |
| 0.45.0 | 2024-09-16 | [\#45469](https://github.com/airbytehq/airbyte/pull/45469) | Fix some race conditions, improve thread filtering, improve test logging |
| 0.46.1 | 2024-09-20 | [\#45700](https://github.com/airbytehq/airbyte/pull/45700) | Destinations: Fix bug in parsing jsonschema |
| 0.46.0 | 2024-09-18 | [\#45432](https://github.com/airbytehq/airbyte/pull/45432) | upgrade all libraries to latest version |
| 0.45.1 | 2024-09-17 | [\#45638](https://github.com/airbytehq/airbyte/pull/45638) | upgrade apache mina sshd to 2.13.2 to handle openssh tcpkeepalive requests |
| 0.45.0 | 2024-09-16 | [\#45469](https://github.com/airbytehq/airbyte/pull/45469) | Fix some race conditions, improve thread filtering, improve test logging |
| 0.44.22 | 2024-09-10 | [\#45368](https://github.com/airbytehq/airbyte/pull/45368) | Remove excessive debezium logging |
| 0.44.21 | 2024-09-04 | [\#45143](https://github.com/airbytehq/airbyte/pull/45143) | S3-destination: don't overwrite existing files, skip those file indexes instead |
| 0.44.20 | 2024-08-30 | [\#44933](https://github.com/airbytehq/airbyte/pull/44933) | Avro/Parquet destinations: handle `{}` schemas inside objects/arrays |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,19 @@ abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
node.put(columnName, DataTypeUtils.returnNullIfInvalid { resultSet.getLong(index) })
}

@Throws(SQLException::class)
protected fun putBigInteger(
node: ObjectNode,
columnName: String?,
resultSet: ResultSet,
index: Int
) {
node.put(
columnName,
DataTypeUtils.returnNullIfInvalid { resultSet.getBigDecimal(index).toBigInteger() }
)
}

@Throws(SQLException::class)
protected open fun putDouble(
node: ObjectNode,
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.48.1
version=0.48.2
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ java {
}

airbyteJavaConnector {
cdkVersionRequired = '0.47.1'
cdkVersionRequired = '0.48.2'
features = ['db-sources', 'datastore-postgres']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.6.22
dockerImageTag: 3.6.23
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.airbyte.protocol.models.JsonSchemaPrimitiveUtil.JsonSchemaPrimitive;
import io.airbyte.protocol.models.JsonSchemaType;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand Down Expand Up @@ -172,7 +173,7 @@ public void copyToJsonField(final ResultSet resultSet, final int colIndex, final
// If a numeric_array column precision is not 0 AND scale is 0,
// then we know the precision and scale are purposefully chosen
if (metadata.getPrecision(colIndex) != 0 && metadata.getScale(colIndex) == 0) {
putBigIntArray(json, columnName, resultSet, colIndex);
putBigIntegerArray(json, columnName, resultSet, colIndex);
} else {
putBigDecimalArray(json, columnName, resultSet, colIndex);
}
Expand All @@ -197,7 +198,7 @@ public void copyToJsonField(final ResultSet resultSet, final int colIndex, final
case REAL -> putFloat(json, columnName, resultSet, colIndex);
case NUMERIC, DECIMAL -> {
if (metadata.getPrecision(colIndex) != 0 && metadata.getScale(colIndex) == 0) {
putBigInt(json, columnName, resultSet, colIndex);
putBigInteger(json, columnName, resultSet, colIndex);
} else {
putBigDecimal(json, columnName, resultSet, colIndex);
}
Expand Down Expand Up @@ -367,6 +368,16 @@ private void putBigIntArray(final ObjectNode node, final String columnName, fina
node.set(columnName, arrayNode);
}

private void putBigIntegerArray(final ObjectNode node, final String columnName, final ResultSet resultSet, final int colIndex) throws SQLException {
final ArrayNode arrayNode = Jsons.arrayNode();
final ResultSet arrayResultSet = resultSet.getArray(colIndex).getResultSet();
while (arrayResultSet.next()) {
final BigInteger value = DataTypeUtils.throwExceptionIfInvalid(() -> arrayResultSet.getBigDecimal(2).toBigInteger());
arrayNode.add(value);
}
node.set(columnName, arrayNode);
}

private void putDoubleArray(final ObjectNode node, final String columnName, final ResultSet resultSet, final int colIndex) throws SQLException {
final ArrayNode arrayNode = Jsons.arrayNode();
final ResultSet arrayResultSet = resultSet.getArray(colIndex).getResultSet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.postgres.PostgresTestDatabase.BaseImage;
import io.airbyte.integrations.source.postgres.PostgresTestDatabase.ContainerModifier;
import java.math.BigInteger;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand Down Expand Up @@ -54,7 +55,7 @@ public void numericColumnAsCursor() throws SQLException {
for (int i = 1; i <= 4; i++) {
final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap());
jsonNode.put("id", i);
final long cursorValue = i * 10;
final BigInteger cursorValue = BigInteger.valueOf(i * 10);
jsonNode.put(cursorColumn, cursorValue);
final String insertQuery = String.format("INSERT INTO %s VALUES (%s, %s);",
tableName,
Expand Down
Loading

0 comments on commit ffde35c

Please sign in to comment.