From 2f6353a5d378c363d7b1ef0ce5f6cc7dcb9a6c93 Mon Sep 17 00:00:00 2001 From: Gireesh Sreepathi Date: Mon, 5 Feb 2024 17:27:14 -0800 Subject: [PATCH 1/3] postgres/fix-varchar-limit --- .../typing_deduping/PostgresSqlGenerator.java | 13 +++ .../PostgresTypingDedupingTest.java | 88 +++++++++++++++++++ 2 files changed, 101 insertions(+) diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGenerator.java b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGenerator.java index 6b333ad1d382..0918226b3227 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGenerator.java +++ b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGenerator.java @@ -102,6 +102,19 @@ protected SQLDialect getDialect() { return SQLDialect.POSTGRES; } + @Override + public DataType toDialectType(AirbyteProtocolType airbyteProtocolType) { + if (airbyteProtocolType.equals(AirbyteProtocolType.STRING)) { + // https://www.postgresql.org/docs/current/datatype-character.html + // If specified, the length n must be greater than zero and cannot exceed 10,485,760 (10 MB). + // If you desire to store long strings with no specific upper limit, + // use text or character varying without a length specifier, + // rather than making up an arbitrary length limit. + return SQLDataType.VARCHAR; + } + return super.toDialectType(airbyteProtocolType); + } + @Override public Sql createTable(final StreamConfig stream, final String suffix, final boolean force) { final List statements = new ArrayList<>(); diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresTypingDedupingTest.java b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresTypingDedupingTest.java index f58b10efe1eb..e181b57d0d45 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresTypingDedupingTest.java +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresTypingDedupingTest.java @@ -4,21 +4,32 @@ package io.airbyte.integrations.destination.postgres.typing_deduping; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.fasterxml.jackson.annotation.JsonAnyGetter; +import com.fasterxml.jackson.annotation.JsonAnySetter; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import io.airbyte.cdk.db.JdbcCompatibleSourceOperations; import io.airbyte.cdk.integrations.standardtest.destination.typing_deduping.JdbcTypingDedupingTest; +import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator; import io.airbyte.integrations.destination.postgres.PostgresDestination; import io.airbyte.integrations.destination.postgres.PostgresSQLNameTransformer; import io.airbyte.integrations.destination.postgres.PostgresTestDatabase; import io.airbyte.protocol.models.v0.AirbyteMessage; +import io.airbyte.protocol.models.v0.AirbyteMessage.Type; +import io.airbyte.protocol.models.v0.AirbyteRecordMessage; import io.airbyte.protocol.models.v0.AirbyteStream; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; import io.airbyte.protocol.models.v0.DestinationSyncMode; import io.airbyte.protocol.models.v0.SyncMode; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Random; import javax.sql.DataSource; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -26,8 +37,46 @@ public class PostgresTypingDedupingTest extends JdbcTypingDedupingTest { + public static class TestMessage { + + private final Map additionalProperties = new HashMap<>(); + + // Empty constructor for jackson + public TestMessage() {} + + @JsonAnyGetter + public Map getAdditionalProperties() { + return this.additionalProperties; + } + + @JsonAnySetter + public void setAdditionalProperty(String name, Object value) { + this.additionalProperties.put(name, value); + } + + public TestMessage withAdditionalProperty(String name, Object value) { + this.additionalProperties.put(name, value); + return this; + } + + } + protected static PostgresTestDatabase testContainer; + private static final int DEFAULT_VARCHAR_LIMIT_IN_JDBC_GEN = 65535; + + private static final Random RANDOM = new Random(); + + private String generateBigString() { + // Generate exactly 2 chars over the limit + final int length = DEFAULT_VARCHAR_LIMIT_IN_JDBC_GEN + 2; + return RANDOM + .ints('a', 'z' + 1) + .limit(length) + .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append) + .toString(); + } + @BeforeAll public static void setupPostgres() { testContainer = PostgresTestDatabase.in(PostgresTestDatabase.BaseImage.POSTGRES_13); @@ -107,4 +156,43 @@ protected List dumpRawTableRecords(String streamNamespace, String stre return super.dumpRawTableRecords(streamNamespace, streamName.toLowerCase()); } + @Test + public void testVarcharLimitOver64K() throws Exception { + final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( + new ConfiguredAirbyteStream() + .withSyncMode(SyncMode.FULL_REFRESH) + .withDestinationSyncMode(DestinationSyncMode.OVERWRITE) + .withStream(new AirbyteStream() + .withNamespace(streamNamespace) + .withName(streamName) + .withJsonSchema(SCHEMA)))); + + + + final AirbyteMessage message = new AirbyteMessage(); + final String largeString = generateBigString(); + final TestMessage testMessage = + new TestMessage() + .withAdditionalProperty("id1", 1) + .withAdditionalProperty("id2", 200) + .withAdditionalProperty("updated_at", "2021-01-01T00:00:00Z") + .withAdditionalProperty("name", largeString); + message.setType(Type.RECORD); + message.setRecord(new AirbyteRecordMessage() + .withNamespace(streamNamespace) + .withStream(streamName) + .withData(Jsons.jsonNode(testMessage)) + .withEmittedAt(1000L)); + final List messages1 = new ArrayList<>(); + messages1.add(message); + runSync(catalog, messages1); + + // Only assert on the large varchar string landing in final table. + // Rest of the fields' correctness is tested by other means in other tests. + final List actualFinalRecords = dumpFinalTableRecords(streamNamespace, streamName); + assertEquals(1, actualFinalRecords.size()); + assertEquals(largeString, actualFinalRecords.get(0).get("name").asText()); + + } + } From d35c394343f9d11fd3b6c0ae827f828ad5abe51d Mon Sep 17 00:00:00 2001 From: Gireesh Sreepathi Date: Tue, 6 Feb 2024 12:02:59 -0800 Subject: [PATCH 2/3] version update --- .../destination-postgres-strict-encrypt/metadata.yaml | 2 +- .../connectors/destination-postgres/metadata.yaml | 2 +- .../typing_deduping/PostgresTypingDedupingTest.java | 8 +++----- docs/integrations/destinations/postgres.md | 1 + 4 files changed, 6 insertions(+), 7 deletions(-) diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/metadata.yaml b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/metadata.yaml index 2b99bfe3d7e4..117c6570d909 100644 --- a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/metadata.yaml +++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 25c5221d-dce2-4163-ade9-739ef790f503 - dockerImageTag: 0.6.2 + dockerImageTag: 0.6.3 dockerRepository: airbyte/destination-postgres-strict-encrypt documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres githubIssueLabel: destination-postgres diff --git a/airbyte-integrations/connectors/destination-postgres/metadata.yaml b/airbyte-integrations/connectors/destination-postgres/metadata.yaml index 99b6d29ef6b5..bcb00a6873d5 100644 --- a/airbyte-integrations/connectors/destination-postgres/metadata.yaml +++ b/airbyte-integrations/connectors/destination-postgres/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 25c5221d-dce2-4163-ade9-739ef790f503 - dockerImageTag: 0.6.2 + dockerImageTag: 0.6.3 dockerRepository: airbyte/destination-postgres documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres githubIssueLabel: destination-postgres diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresTypingDedupingTest.java b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresTypingDedupingTest.java index e181b57d0d45..c919d4e20212 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresTypingDedupingTest.java +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresTypingDedupingTest.java @@ -163,11 +163,9 @@ public void testVarcharLimitOver64K() throws Exception { .withSyncMode(SyncMode.FULL_REFRESH) .withDestinationSyncMode(DestinationSyncMode.OVERWRITE) .withStream(new AirbyteStream() - .withNamespace(streamNamespace) - .withName(streamName) - .withJsonSchema(SCHEMA)))); - - + .withNamespace(streamNamespace) + .withName(streamName) + .withJsonSchema(SCHEMA)))); final AirbyteMessage message = new AirbyteMessage(); final String largeString = generateBigString(); diff --git a/docs/integrations/destinations/postgres.md b/docs/integrations/destinations/postgres.md index 9552b38a3e84..3fa8fd2cb5f7 100644 --- a/docs/integrations/destinations/postgres.md +++ b/docs/integrations/destinations/postgres.md @@ -170,6 +170,7 @@ Now that you have set up the Postgres destination connector, check out the follo | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------| +| 0.6.3 | 2024-02-06 | [34891](https://github.com/airbytehq/airbyte/pull/34891) | Remove varchar limit, use system defaults | | 0.6.2 | 2024-01-30 | [34683](https://github.com/airbytehq/airbyte/pull/34683) | CDK Upgrade 0.16.3; Fix dependency mismatches in slf4j lib | | 0.6.1 | 2024-01-29 | [34630](https://github.com/airbytehq/airbyte/pull/34630) | CDK Upgrade; Use lowercase raw table in T+D queries. | | 0.6.0 | 2024-01-19 | [34372](https://github.com/airbytehq/airbyte/pull/34372) | Add dv2 flag in spec | From fcf828eef1fe95dab0422a6f828c95079a88a716 Mon Sep 17 00:00:00 2001 From: Gireesh Sreepathi Date: Tue, 6 Feb 2024 12:13:23 -0800 Subject: [PATCH 3/3] remove unneccessary class --- .../PostgresTypingDedupingTest.java | 41 ++++--------------- 1 file changed, 7 insertions(+), 34 deletions(-) diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresTypingDedupingTest.java b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresTypingDedupingTest.java index c919d4e20212..22b525af74f7 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresTypingDedupingTest.java +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresTypingDedupingTest.java @@ -6,10 +6,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals; -import com.fasterxml.jackson.annotation.JsonAnyGetter; -import com.fasterxml.jackson.annotation.JsonAnySetter; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.ImmutableMap; import io.airbyte.cdk.db.JdbcCompatibleSourceOperations; import io.airbyte.cdk.integrations.standardtest.destination.typing_deduping.JdbcTypingDedupingTest; import io.airbyte.commons.json.Jsons; @@ -26,7 +25,6 @@ import io.airbyte.protocol.models.v0.DestinationSyncMode; import io.airbyte.protocol.models.v0.SyncMode; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; @@ -37,30 +35,6 @@ public class PostgresTypingDedupingTest extends JdbcTypingDedupingTest { - public static class TestMessage { - - private final Map additionalProperties = new HashMap<>(); - - // Empty constructor for jackson - public TestMessage() {} - - @JsonAnyGetter - public Map getAdditionalProperties() { - return this.additionalProperties; - } - - @JsonAnySetter - public void setAdditionalProperty(String name, Object value) { - this.additionalProperties.put(name, value); - } - - public TestMessage withAdditionalProperty(String name, Object value) { - this.additionalProperties.put(name, value); - return this; - } - - } - protected static PostgresTestDatabase testContainer; private static final int DEFAULT_VARCHAR_LIMIT_IN_JDBC_GEN = 65535; @@ -169,17 +143,16 @@ public void testVarcharLimitOver64K() throws Exception { final AirbyteMessage message = new AirbyteMessage(); final String largeString = generateBigString(); - final TestMessage testMessage = - new TestMessage() - .withAdditionalProperty("id1", 1) - .withAdditionalProperty("id2", 200) - .withAdditionalProperty("updated_at", "2021-01-01T00:00:00Z") - .withAdditionalProperty("name", largeString); + final Map data = ImmutableMap.of( + "id1", 1, + "id2", 200, + "updated_at", "2021-01-01T00:00:00Z", + "name", largeString); message.setType(Type.RECORD); message.setRecord(new AirbyteRecordMessage() .withNamespace(streamNamespace) .withStream(streamName) - .withData(Jsons.jsonNode(testMessage)) + .withData(Jsons.jsonNode(data)) .withEmittedAt(1000L)); final List messages1 = new ArrayList<>(); messages1.add(message);