Skip to content

Commit

Permalink
S3-Parquet: added handler to process null values in arrays (#23788)
Browse files Browse the repository at this point in the history
* [22807] Parquet - added handler to null values in arrays
  • Loading branch information
etsybaev authored and erohmensing committed Mar 22, 2023
1 parent e0845bc commit 5176a32
Show file tree
Hide file tree
Showing 18 changed files with 51 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
- name: BigQuery
destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerRepository: airbyte/destination-bigquery
dockerImageTag: 1.2.16
dockerImageTag: 1.2.17
documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery
icon: bigquery.svg
normalizationConfig:
Expand All @@ -58,7 +58,7 @@
- name: BigQuery (denormalized typed struct)
destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496
dockerRepository: airbyte/destination-bigquery-denormalized
dockerImageTag: 1.2.16
dockerImageTag: 1.2.17
documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery
icon: bigquery.svg
resourceRequirements:
Expand Down Expand Up @@ -145,7 +145,7 @@
- name: Google Cloud Storage (GCS)
destinationDefinitionId: ca8f6566-e555-4b40-943a-545bf123117a
dockerRepository: airbyte/destination-gcs
dockerImageTag: 0.2.15
dockerImageTag: 0.2.16
documentationUrl: https://docs.airbyte.com/integrations/destinations/gcs
icon: googlecloudstorage.svg
resourceRequirements:
Expand Down Expand Up @@ -290,7 +290,7 @@
- name: Redshift
destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerRepository: airbyte/destination-redshift
dockerImageTag: 0.4.2
dockerImageTag: 0.4.3
documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift
icon: redshift.svg
normalizationConfig:
Expand Down Expand Up @@ -321,7 +321,7 @@
- name: S3
destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerRepository: airbyte/destination-s3
dockerImageTag: 0.3.21
dockerImageTag: 0.3.22
documentationUrl: https://docs.airbyte.com/integrations/destinations/s3
icon: s3.svg
resourceRequirements:
Expand All @@ -348,7 +348,7 @@
- name: Snowflake
destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.4.53
dockerImageTag: 0.4.54
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
icon: snowflake.svg
normalizationConfig:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-bigquery:1.2.16"
- dockerImage: "airbyte/destination-bigquery:1.2.17"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/bigquery"
connectionSpecification:
Expand Down Expand Up @@ -840,7 +840,7 @@
- "overwrite"
- "append"
- "append_dedup"
- dockerImage: "airbyte/destination-bigquery-denormalized:1.2.16"
- dockerImage: "airbyte/destination-bigquery-denormalized:1.2.17"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/bigquery"
connectionSpecification:
Expand Down Expand Up @@ -2399,7 +2399,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-gcs:0.2.15"
- dockerImage: "airbyte/destination-gcs:0.2.16"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/gcs"
connectionSpecification:
Expand Down Expand Up @@ -5055,7 +5055,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-redshift:0.4.2"
- dockerImage: "airbyte/destination-redshift:0.4.3"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/redshift"
connectionSpecification:
Expand Down Expand Up @@ -5525,7 +5525,7 @@
supported_destination_sync_modes:
- "append"
- "overwrite"
- dockerImage: "airbyte/destination-s3:0.3.21"
- dockerImage: "airbyte/destination-s3:0.3.22"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/s3"
connectionSpecification:
Expand Down Expand Up @@ -6151,7 +6151,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-snowflake:0.4.53"
- dockerImage: "airbyte/destination-snowflake:0.4.54"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/snowflake"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.destination.s3.parquet;

import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;

import io.airbyte.commons.functional.CheckedBiFunction;
import io.airbyte.integrations.destination.record_buffer.FileBuffer;
import io.airbyte.integrations.destination.record_buffer.SerializableBuffer;
Expand Down Expand Up @@ -70,8 +72,11 @@ public ParquetSerializedBuffer(final S3DestinationConfig config,
Files.deleteIfExists(bufferFile);
avroRecordFactory = new AvroRecordFactory(schema, AvroConstants.JSON_CONVERTER);
final S3ParquetFormatConfig formatConfig = (S3ParquetFormatConfig) config.getFormatConfig();
Configuration avroConfig = new Configuration();
avroConfig.setBoolean(WRITE_OLD_LIST_STRUCTURE, false);
parquetWriter = AvroParquetWriter.<Record>builder(HadoopOutputFile
.fromPath(new org.apache.hadoop.fs.Path(bufferFile.toUri()), new Configuration()))
.fromPath(new org.apache.hadoop.fs.Path(bufferFile.toUri()), avroConfig))
.withConf(avroConfig) // yes, this should be here despite the fact we pass this config above in path
.withSchema(schema)
.withCompressionCodec(formatConfig.getCompressionCodec())
.withRowGroupSize(formatConfig.getBlockSize())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.destination.s3.parquet;

import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;

import com.amazonaws.services.s3.AmazonS3;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
Expand Down Expand Up @@ -69,7 +71,9 @@ public S3ParquetWriter(final S3DestinationConfig config,
final Path path = new Path(new URI(fullFilePath));
final S3ParquetFormatConfig formatConfig = (S3ParquetFormatConfig) config.getFormatConfig();
final Configuration hadoopConfig = getHadoopConfig(config);
hadoopConfig.setBoolean(WRITE_OLD_LIST_STRUCTURE, false);
this.parquetWriter = AvroParquetWriter.<Record>builder(HadoopOutputFile.fromPath(path, hadoopConfig))
.withConf(hadoopConfig) // yes, this should be here despite the fact we pass this config above in path
.withSchema(schema)
.withCompressionCodec(formatConfig.getCompressionCodec())
.withRowGroupSize(formatConfig.getBlockSize())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.airbyte.integrations.destination.record_buffer.SerializableBuffer;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaPrimitiveUtil.JsonSchemaPrimitive;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
Expand All @@ -26,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Stream;
import org.apache.avro.generic.GenericData.Record;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
Expand All @@ -40,6 +42,7 @@ public class ParquetSerializedBufferTest {
"column2", "string value",
"another field", true,
"nested_column", Map.of("array_column", List.of(1, 2, 3)),
"string_array_column", Stream.of("test_string", null).toList(),
"datetime_with_timezone", "2022-05-12T15:35:44.192950Z"));
private static final String STREAM = "stream1";
private static final AirbyteStreamNameNamespacePair streamPair = new AirbyteStreamNameNamespacePair(STREAM, null);
Expand All @@ -52,6 +55,8 @@ public class ParquetSerializedBufferTest {
Field.of("column2", JsonSchemaType.STRING),
Field.of("another field", JsonSchemaType.BOOLEAN),
Field.of("nested_column", JsonSchemaType.OBJECT),
Field.of("string_array_column", JsonSchemaType.builder(JsonSchemaPrimitive.ARRAY)
.withItems(JsonSchemaType.STRING).build()),
Field.of("datetime_with_timezone", JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE));
private static final ConfiguredAirbyteCatalog catalog = CatalogHelpers.createConfiguredAirbyteCatalog(STREAM, null, FIELDS);

Expand All @@ -62,7 +67,7 @@ public void testUncompressedParquetWriter() throws Exception {
"format_type", "parquet"),
"s3_bucket_name", "test",
"s3_bucket_region", "us-east-2")));
runTest(195L, 215L, config, getExpectedString());
runTest(225L, 245L, config, getExpectedString());
}

@Test
Expand All @@ -74,7 +79,7 @@ public void testCompressedParquetWriter() throws Exception {
"s3_bucket_name", "test",
"s3_bucket_region", "us-east-2")));
// TODO: Compressed parquet is the same size as uncompressed??
runTest(195L, 215L, config, getExpectedString());
runTest(225L, 245L, config, getExpectedString());
}

private static String resolveArchitecture() {
Expand Down Expand Up @@ -119,14 +124,15 @@ private void runLzoParquetTest() throws Exception {
"compression_codec", "LZO"),
"s3_bucket_name", "test",
"s3_bucket_region", "us-east-2")));
runTest(195L, 215L, config, getExpectedString());
runTest(225L, 245L, config, getExpectedString());
}

private static String getExpectedString() {
return "{\"_airbyte_ab_id\": \"<UUID>\", \"_airbyte_emitted_at\": \"<timestamp>\", "
+ "\"field1\": 10000.0, \"another_field\": true, "
+ "\"nested_column\": {\"_airbyte_additional_properties\": {\"array_column\": \"[1,2,3]\"}}, "
+ "\"column2\": \"string value\", "
+ "\"string_array_column\": [\"test_string\", null], "
+ "\"datetime_with_timezone\": 1652369744192000, "
+ "\"_airbyte_additional_properties\": null}";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ ENV ENABLE_SENTRY true

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.2.16
LABEL io.airbyte.version=1.2.17
LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ ENV ENABLE_SENTRY true

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.2.16
LABEL io.airbyte.version=1.2.17
LABEL io.airbyte.name=airbyte/destination-bigquery
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/destination-gcs/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-gcs

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.2.15
LABEL io.airbyte.version=0.2.16
LABEL io.airbyte.name=airbyte/destination-gcs
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-redshift

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.4.2
LABEL io.airbyte.version=0.4.3
LABEL io.airbyte.name=airbyte/destination-redshift
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/destination-s3/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,5 @@ RUN /bin/bash -c 'set -e && \
echo "unknown arch" ;\
fi'

LABEL io.airbyte.version=0.3.21
LABEL io.airbyte.version=0.3.22
LABEL io.airbyte.name=airbyte/destination-s3
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1

ENV ENABLE_SENTRY true

LABEL io.airbyte.version=0.4.53
LABEL io.airbyte.version=0.4.54
LABEL io.airbyte.name=airbyte/destination-snowflake
Loading

0 comments on commit 5176a32

Please sign in to comment.