diff --git a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/metadata.yaml b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/metadata.yaml index 2b99bfe3d7e4..117c6570d909 100644 --- a/airbyte-integrations/connectors/destination-postgres-strict-encrypt/metadata.yaml +++ b/airbyte-integrations/connectors/destination-postgres-strict-encrypt/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 25c5221d-dce2-4163-ade9-739ef790f503 - dockerImageTag: 0.6.2 + dockerImageTag: 0.6.3 dockerRepository: airbyte/destination-postgres-strict-encrypt documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres githubIssueLabel: destination-postgres diff --git a/airbyte-integrations/connectors/destination-postgres/metadata.yaml b/airbyte-integrations/connectors/destination-postgres/metadata.yaml index 99b6d29ef6b5..bcb00a6873d5 100644 --- a/airbyte-integrations/connectors/destination-postgres/metadata.yaml +++ b/airbyte-integrations/connectors/destination-postgres/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 25c5221d-dce2-4163-ade9-739ef790f503 - dockerImageTag: 0.6.2 + dockerImageTag: 0.6.3 dockerRepository: airbyte/destination-postgres documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres githubIssueLabel: destination-postgres diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGenerator.java b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGenerator.java index 6b333ad1d382..0918226b3227 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGenerator.java +++ b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresSqlGenerator.java @@ -102,6 +102,19 @@ protected SQLDialect getDialect() { return SQLDialect.POSTGRES; } + @Override + public DataType toDialectType(AirbyteProtocolType airbyteProtocolType) { + if (airbyteProtocolType.equals(AirbyteProtocolType.STRING)) { + // https://www.postgresql.org/docs/current/datatype-character.html + // If specified, the length n must be greater than zero and cannot exceed 10,485,760 (10 MB). + // If you desire to store long strings with no specific upper limit, + // use text or character varying without a length specifier, + // rather than making up an arbitrary length limit. + return SQLDataType.VARCHAR; + } + return super.toDialectType(airbyteProtocolType); + } + @Override public Sql createTable(final StreamConfig stream, final String suffix, final boolean force) { final List statements = new ArrayList<>(); diff --git a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresTypingDedupingTest.java b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresTypingDedupingTest.java index f58b10efe1eb..22b525af74f7 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresTypingDedupingTest.java +++ b/airbyte-integrations/connectors/destination-postgres/src/test-integration/java/io/airbyte/integrations/destination/postgres/typing_deduping/PostgresTypingDedupingTest.java @@ -4,21 +4,30 @@ package io.airbyte.integrations.destination.postgres.typing_deduping; +import static org.junit.jupiter.api.Assertions.assertEquals; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.ImmutableMap; import io.airbyte.cdk.db.JdbcCompatibleSourceOperations; import io.airbyte.cdk.integrations.standardtest.destination.typing_deduping.JdbcTypingDedupingTest; +import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator; import io.airbyte.integrations.destination.postgres.PostgresDestination; import io.airbyte.integrations.destination.postgres.PostgresSQLNameTransformer; import io.airbyte.integrations.destination.postgres.PostgresTestDatabase; import io.airbyte.protocol.models.v0.AirbyteMessage; +import io.airbyte.protocol.models.v0.AirbyteMessage.Type; +import io.airbyte.protocol.models.v0.AirbyteRecordMessage; import io.airbyte.protocol.models.v0.AirbyteStream; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; import io.airbyte.protocol.models.v0.DestinationSyncMode; import io.airbyte.protocol.models.v0.SyncMode; +import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.Random; import javax.sql.DataSource; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -28,6 +37,20 @@ public class PostgresTypingDedupingTest extends JdbcTypingDedupingTest { protected static PostgresTestDatabase testContainer; + private static final int DEFAULT_VARCHAR_LIMIT_IN_JDBC_GEN = 65535; + + private static final Random RANDOM = new Random(); + + private String generateBigString() { + // Generate exactly 2 chars over the limit + final int length = DEFAULT_VARCHAR_LIMIT_IN_JDBC_GEN + 2; + return RANDOM + .ints('a', 'z' + 1) + .limit(length) + .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append) + .toString(); + } + @BeforeAll public static void setupPostgres() { testContainer = PostgresTestDatabase.in(PostgresTestDatabase.BaseImage.POSTGRES_13); @@ -107,4 +130,40 @@ protected List dumpRawTableRecords(String streamNamespace, String stre return super.dumpRawTableRecords(streamNamespace, streamName.toLowerCase()); } + @Test + public void testVarcharLimitOver64K() throws Exception { + final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( + new ConfiguredAirbyteStream() + .withSyncMode(SyncMode.FULL_REFRESH) + .withDestinationSyncMode(DestinationSyncMode.OVERWRITE) + .withStream(new AirbyteStream() + .withNamespace(streamNamespace) + .withName(streamName) + .withJsonSchema(SCHEMA)))); + + final AirbyteMessage message = new AirbyteMessage(); + final String largeString = generateBigString(); + final Map data = ImmutableMap.of( + "id1", 1, + "id2", 200, + "updated_at", "2021-01-01T00:00:00Z", + "name", largeString); + message.setType(Type.RECORD); + message.setRecord(new AirbyteRecordMessage() + .withNamespace(streamNamespace) + .withStream(streamName) + .withData(Jsons.jsonNode(data)) + .withEmittedAt(1000L)); + final List messages1 = new ArrayList<>(); + messages1.add(message); + runSync(catalog, messages1); + + // Only assert on the large varchar string landing in final table. + // Rest of the fields' correctness is tested by other means in other tests. + final List actualFinalRecords = dumpFinalTableRecords(streamNamespace, streamName); + assertEquals(1, actualFinalRecords.size()); + assertEquals(largeString, actualFinalRecords.get(0).get("name").asText()); + + } + } diff --git a/docs/integrations/destinations/postgres.md b/docs/integrations/destinations/postgres.md index 9552b38a3e84..3fa8fd2cb5f7 100644 --- a/docs/integrations/destinations/postgres.md +++ b/docs/integrations/destinations/postgres.md @@ -170,6 +170,7 @@ Now that you have set up the Postgres destination connector, check out the follo | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------| +| 0.6.3 | 2024-02-06 | [34891](https://github.com/airbytehq/airbyte/pull/34891) | Remove varchar limit, use system defaults | | 0.6.2 | 2024-01-30 | [34683](https://github.com/airbytehq/airbyte/pull/34683) | CDK Upgrade 0.16.3; Fix dependency mismatches in slf4j lib | | 0.6.1 | 2024-01-29 | [34630](https://github.com/airbytehq/airbyte/pull/34630) | CDK Upgrade; Use lowercase raw table in T+D queries. | | 0.6.0 | 2024-01-19 | [34372](https://github.com/airbytehq/airbyte/pull/34372) | Add dv2 flag in spec |