Skip to content

Commit

Permalink
Destination Postgres: Remove varchar limit of 64k, defaults to 10MiB …
Browse files Browse the repository at this point in the history
…limit (airbytehq#34891)
  • Loading branch information
gisripa authored and jatinyadav-cc committed Feb 26, 2024
1 parent 9dd696d commit b1dd44b
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 25c5221d-dce2-4163-ade9-739ef790f503
dockerImageTag: 0.6.2
dockerImageTag: 0.6.3
dockerRepository: airbyte/destination-postgres-strict-encrypt
documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres
githubIssueLabel: destination-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 25c5221d-dce2-4163-ade9-739ef790f503
dockerImageTag: 0.6.2
dockerImageTag: 0.6.3
dockerRepository: airbyte/destination-postgres
documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres
githubIssueLabel: destination-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,19 @@ protected SQLDialect getDialect() {
return SQLDialect.POSTGRES;
}

@Override
public DataType<?> toDialectType(AirbyteProtocolType airbyteProtocolType) {
if (airbyteProtocolType.equals(AirbyteProtocolType.STRING)) {
// https://www.postgresql.org/docs/current/datatype-character.html
// If specified, the length n must be greater than zero and cannot exceed 10,485,760 (10 MB).
// If you desire to store long strings with no specific upper limit,
// use text or character varying without a length specifier,
// rather than making up an arbitrary length limit.
return SQLDataType.VARCHAR;
}
return super.toDialectType(airbyteProtocolType);
}

@Override
public Sql createTable(final StreamConfig stream, final String suffix, final boolean force) {
final List<Sql> statements = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,30 @@

package io.airbyte.integrations.destination.postgres.typing_deduping;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.cdk.db.JdbcCompatibleSourceOperations;
import io.airbyte.cdk.integrations.standardtest.destination.typing_deduping.JdbcTypingDedupingTest;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator;
import io.airbyte.integrations.destination.postgres.PostgresDestination;
import io.airbyte.integrations.destination.postgres.PostgresSQLNameTransformer;
import io.airbyte.integrations.destination.postgres.PostgresTestDatabase;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import javax.sql.DataSource;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
Expand All @@ -28,6 +37,20 @@ public class PostgresTypingDedupingTest extends JdbcTypingDedupingTest {

protected static PostgresTestDatabase testContainer;

private static final int DEFAULT_VARCHAR_LIMIT_IN_JDBC_GEN = 65535;

private static final Random RANDOM = new Random();

private String generateBigString() {
// Generate exactly 2 chars over the limit
final int length = DEFAULT_VARCHAR_LIMIT_IN_JDBC_GEN + 2;
return RANDOM
.ints('a', 'z' + 1)
.limit(length)
.collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
.toString();
}

@BeforeAll
public static void setupPostgres() {
testContainer = PostgresTestDatabase.in(PostgresTestDatabase.BaseImage.POSTGRES_13);
Expand Down Expand Up @@ -107,4 +130,40 @@ protected List<JsonNode> dumpRawTableRecords(String streamNamespace, String stre
return super.dumpRawTableRecords(streamNamespace, streamName.toLowerCase());
}

@Test
public void testVarcharLimitOver64K() throws Exception {
final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of(
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.FULL_REFRESH)
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE)
.withStream(new AirbyteStream()
.withNamespace(streamNamespace)
.withName(streamName)
.withJsonSchema(SCHEMA))));

final AirbyteMessage message = new AirbyteMessage();
final String largeString = generateBigString();
final Map<String, Object> data = ImmutableMap.of(
"id1", 1,
"id2", 200,
"updated_at", "2021-01-01T00:00:00Z",
"name", largeString);
message.setType(Type.RECORD);
message.setRecord(new AirbyteRecordMessage()
.withNamespace(streamNamespace)
.withStream(streamName)
.withData(Jsons.jsonNode(data))
.withEmittedAt(1000L));
final List<AirbyteMessage> messages1 = new ArrayList<>();
messages1.add(message);
runSync(catalog, messages1);

// Only assert on the large varchar string landing in final table.
// Rest of the fields' correctness is tested by other means in other tests.
final List<JsonNode> actualFinalRecords = dumpFinalTableRecords(streamNamespace, streamName);
assertEquals(1, actualFinalRecords.size());
assertEquals(largeString, actualFinalRecords.get(0).get("name").asText());

}

}
1 change: 1 addition & 0 deletions docs/integrations/destinations/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ Now that you have set up the Postgres destination connector, check out the follo

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------|
| 0.6.3 | 2024-02-06 | [34891](https://github.com/airbytehq/airbyte/pull/34891) | Remove varchar limit, use system defaults |
| 0.6.2 | 2024-01-30 | [34683](https://github.com/airbytehq/airbyte/pull/34683) | CDK Upgrade 0.16.3; Fix dependency mismatches in slf4j lib |
| 0.6.1 | 2024-01-29 | [34630](https://github.com/airbytehq/airbyte/pull/34630) | CDK Upgrade; Use lowercase raw table in T+D queries. |
| 0.6.0 | 2024-01-19 | [34372](https://github.com/airbytehq/airbyte/pull/34372) | Add dv2 flag in spec |
Expand Down

0 comments on commit b1dd44b

Please sign in to comment.