Skip to content

Commit

Permalink
destination-s3-glue: Add TableType and fix race condition (#22220)
Browse files Browse the repository at this point in the history
- The TableType attribute is not getting populated in the Glue catalog which has started
to lead to errors when trying to query the table from e.g. Trino. The error message
observed is `Cannot invoke "String.equals(Object)" because "tableType" is null`
- There is a race condition in initializing the destination connector where a failure
occurs if multiple connections attempt to initialize at the same time, because the test
table that gets created is statically named. This adds a random suffix to the table to
avoid that race condition.
- The Airbyte sync ID and emitted_at fields are useful for deduplicating data. This adds
those fields to the table definition since they are already included in the records as written.
  • Loading branch information
blarghmatey authored Feb 8, 2023
1 parent 857bcac commit b0a42bf
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@
- name: S3 Glue
destinationDefinitionId: 471e5cab-8ed1-49f3-ba11-79c687784737
dockerRepository: airbyte/destination-s3-glue
dockerImageTag: 0.1.1
dockerImageTag: 0.1.2
documentationUrl: https://docs.airbyte.com/integrations/destinations/s3-glue
icon: s3-glue.svg
releaseStage: alpha
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5869,7 +5869,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-s3-glue:0.1.1"
- dockerImage: "airbyte/destination-s3-glue:0.1.2"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/s3"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ ENV APPLICATION destination-s3-glue

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.name=airbyte/destination-s3-glue
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void upsertTable(String databaseName,
.withTableInput(
new TableInput()
.withName(tableName)
// .withTableType("GOVERNED")
.withTableType("EXTERNAL_TABLE")
.withStorageDescriptor(
new StorageDescriptor()
.withLocation(location)
Expand All @@ -80,7 +80,7 @@ public void upsertTable(String databaseName,
.withTableInput(
new TableInput()
.withName(tableName)
// .withTableType("GOVERNED")
.withTableType("EXTERNAL_TABLE")
.withStorageDescriptor(
new StorageDescriptor()
.withLocation(location)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
package io.airbyte.integrations.destination.s3_glue;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
import io.airbyte.commons.functional.CheckedBiConsumer;
import io.airbyte.commons.functional.CheckedBiFunction;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer;
import io.airbyte.integrations.destination.buffered_stream_consumer.OnCloseFunction;
Expand Down Expand Up @@ -85,6 +87,8 @@ private static Function<ConfiguredAirbyteStream, S3GlueWriteConfig> toWriteConfi
final String fullOutputPath = storageOperations.getBucketObjectPath(namespace, streamName, SYNC_DATETIME, customOutputFormat);
final DestinationSyncMode syncMode = stream.getDestinationSyncMode();
final JsonNode jsonSchema = abStream.getJsonSchema();
((ObjectNode)jsonSchema.get("properties")).putPOJO(JavaBaseConstants.COLUMN_NAME_AB_ID, Map.of("type", "string"));
((ObjectNode)jsonSchema.get("properties")).putPOJO(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, Map.of("type", "string"));
final String location = "s3://" + s3Config.getBucketName() + "/" +
fullOutputPath.substring(0, fullOutputPath.lastIndexOf("/") + 1);
final S3GlueWriteConfig writeConfig =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import java.util.function.Consumer;

import org.apache.commons.lang3.RandomStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -43,7 +45,9 @@ public AirbyteConnectionStatus check(JsonNode config) {
}
final GlueDestinationConfig glueConfig = GlueDestinationConfig.getInstance(config);
MetastoreOperations metastoreOperations = null;
String tableName = "test_table";
// If there are multiple syncs started at the same time a stataic test table name causes a resource collision and a failure to sync.
String tableSuffix = RandomStringUtils.randomAlphabetic(9);
String tableName = "test_table_" + tableSuffix;
try {
metastoreOperations = new GlueOperations(glueConfig.getAWSGlueInstance());
metastoreOperations.upsertTable(glueConfig.getDatabase(), tableName, "s3://", Jsons.emptyObject(), glueConfig.getSerializationLibrary());
Expand Down
9 changes: 5 additions & 4 deletions docs/integrations/destinations/s3-glue.md
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ Output files can be compressed. The default option is GZIP compression. If compr

## CHANGELOG

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :------------- |
| 0.1.1 | 2022-12-13 | [19907](https://github.com/airbytehq/airbyte/pull/19907) | Fix parsing empty object in schema |
| 0.1.0 | 2022-11-17 | [18695](https://github.com/airbytehq/airbyte/pull/18695) | Initial Commit |
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------|
| 0.1.2 | 2023-02-01 | [22220](https://github.com/airbytehq/airbyte/pull/22220) | Fix race condition in test, table metadata, add Airbyte sync fields to table definition |
| 0.1.1 | 2022-12-13 | [19907](https://github.com/airbytehq/airbyte/pull/19907) | Fix parsing empty object in schema |
| 0.1.0 | 2022-11-17 | [18695](https://github.com/airbytehq/airbyte/pull/18695) | Initial Commit |

0 comments on commit b0a42bf

Please sign in to comment.