Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination Postgres: Remove varchar limit of 64k, defaults to 10MiB limit #34891

Merged
merged 3 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 25c5221d-dce2-4163-ade9-739ef790f503
dockerImageTag: 0.6.2
dockerImageTag: 0.6.3
dockerRepository: airbyte/destination-postgres-strict-encrypt
documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres
githubIssueLabel: destination-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 25c5221d-dce2-4163-ade9-739ef790f503
dockerImageTag: 0.6.2
dockerImageTag: 0.6.3
dockerRepository: airbyte/destination-postgres
documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres
githubIssueLabel: destination-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,19 @@ protected SQLDialect getDialect() {
return SQLDialect.POSTGRES;
}

@Override
public DataType<?> toDialectType(AirbyteProtocolType airbyteProtocolType) {
if (airbyteProtocolType.equals(AirbyteProtocolType.STRING)) {
// https://www.postgresql.org/docs/current/datatype-character.html
// If specified, the length n must be greater than zero and cannot exceed 10,485,760 (10 MB).
// If you desire to store long strings with no specific upper limit,
// use text or character varying without a length specifier,
// rather than making up an arbitrary length limit.
return SQLDataType.VARCHAR;
}
return super.toDialectType(airbyteProtocolType);
}

@Override
public Sql createTable(final StreamConfig stream, final String suffix, final boolean force) {
final List<Sql> statements = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,30 @@

package io.airbyte.integrations.destination.postgres.typing_deduping;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.cdk.db.JdbcCompatibleSourceOperations;
import io.airbyte.cdk.integrations.standardtest.destination.typing_deduping.JdbcTypingDedupingTest;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator;
import io.airbyte.integrations.destination.postgres.PostgresDestination;
import io.airbyte.integrations.destination.postgres.PostgresSQLNameTransformer;
import io.airbyte.integrations.destination.postgres.PostgresTestDatabase;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import javax.sql.DataSource;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
Expand All @@ -28,6 +37,20 @@ public class PostgresTypingDedupingTest extends JdbcTypingDedupingTest {

protected static PostgresTestDatabase testContainer;

private static final int DEFAULT_VARCHAR_LIMIT_IN_JDBC_GEN = 65535;

private static final Random RANDOM = new Random();

private String generateBigString() {
// Generate exactly 2 chars over the limit
final int length = DEFAULT_VARCHAR_LIMIT_IN_JDBC_GEN + 2;
return RANDOM
.ints('a', 'z' + 1)
.limit(length)
.collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
.toString();
}

@BeforeAll
public static void setupPostgres() {
testContainer = PostgresTestDatabase.in(PostgresTestDatabase.BaseImage.POSTGRES_13);
Expand Down Expand Up @@ -107,4 +130,40 @@ protected List<JsonNode> dumpRawTableRecords(String streamNamespace, String stre
return super.dumpRawTableRecords(streamNamespace, streamName.toLowerCase());
}

@Test
public void testVarcharLimitOver64K() throws Exception {
final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of(
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.FULL_REFRESH)
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE)
.withStream(new AirbyteStream()
.withNamespace(streamNamespace)
.withName(streamName)
.withJsonSchema(SCHEMA))));

final AirbyteMessage message = new AirbyteMessage();
final String largeString = generateBigString();
final Map<String, Object> data = ImmutableMap.of(
"id1", 1,
"id2", 200,
"updated_at", "2021-01-01T00:00:00Z",
"name", largeString);
message.setType(Type.RECORD);
message.setRecord(new AirbyteRecordMessage()
.withNamespace(streamNamespace)
.withStream(streamName)
.withData(Jsons.jsonNode(data))
.withEmittedAt(1000L));
final List<AirbyteMessage> messages1 = new ArrayList<>();
messages1.add(message);
runSync(catalog, messages1);

// Only assert on the large varchar string landing in final table.
// Rest of the fields' correctness is tested by other means in other tests.
final List<JsonNode> actualFinalRecords = dumpFinalTableRecords(streamNamespace, streamName);
assertEquals(1, actualFinalRecords.size());
assertEquals(largeString, actualFinalRecords.get(0).get("name").asText());

}

}
1 change: 1 addition & 0 deletions docs/integrations/destinations/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ Now that you have set up the Postgres destination connector, check out the follo

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------|
| 0.6.3 | 2024-02-06 | [34891](https://github.com/airbytehq/airbyte/pull/34891) | Remove varchar limit, use system defaults |
| 0.6.2 | 2024-01-30 | [34683](https://github.com/airbytehq/airbyte/pull/34683) | CDK Upgrade 0.16.3; Fix dependency mismatches in slf4j lib |
| 0.6.1 | 2024-01-29 | [34630](https://github.com/airbytehq/airbyte/pull/34630) | CDK Upgrade; Use lowercase raw table in T+D queries. |
| 0.6.0 | 2024-01-19 | [34372](https://github.com/airbytehq/airbyte/pull/34372) | Add dv2 flag in spec |
Expand Down
Loading