Skip to content

Commit

Permalink
destination-s3-glue: Add TableType and fix race condition
Browse files Browse the repository at this point in the history
- The TableType attribute is not getting populated in the Glue catalog which has started
to lead to errors when trying to query the table from e.g. Trino. The error message
observed is `Cannot invoke "String.equals(Object)" because "tableType" is null`
- There is a race condition in initializing the destination connector where a failure
occurs if multiple connections attempt to initialize at the same time, because the test
table that gets created is statically named. This adds a random suffix to the table to
avoid that race condition.
- The Airbyte sync ID and emitted_at fields are useful for deduplicating data. This adds
those fields to the table definition since they are already included in the records as written.
  • Loading branch information
blarghmatey committed Feb 8, 2023
1 parent 03da6c6 commit 97f6a97
Showing 7 changed files with 19 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -334,7 +334,7 @@
- name: S3 Glue
destinationDefinitionId: 471e5cab-8ed1-49f3-ba11-79c687784737
dockerRepository: airbyte/destination-s3-glue
dockerImageTag: 0.1.1
dockerImageTag: 0.1.2
documentationUrl: https://docs.airbyte.com/integrations/destinations/s3-glue
icon: s3-glue.svg
releaseStage: alpha
Original file line number Diff line number Diff line change
@@ -5869,7 +5869,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-s3-glue:0.1.1"
- dockerImage: "airbyte/destination-s3-glue:0.1.2"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/s3"
connectionSpecification:
Original file line number Diff line number Diff line change
@@ -14,5 +14,5 @@ ENV APPLICATION destination-s3-glue

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.name=airbyte/destination-s3-glue
Original file line number Diff line number Diff line change
@@ -57,7 +57,7 @@ public void upsertTable(String databaseName,
.withTableInput(
new TableInput()
.withName(tableName)
// .withTableType("GOVERNED")
.withTableType("EXTERNAL_TABLE")
.withStorageDescriptor(
new StorageDescriptor()
.withLocation(location)
@@ -80,7 +80,7 @@ public void upsertTable(String databaseName,
.withTableInput(
new TableInput()
.withName(tableName)
// .withTableType("GOVERNED")
.withTableType("EXTERNAL_TABLE")
.withStorageDescriptor(
new StorageDescriptor()
.withLocation(location)
Original file line number Diff line number Diff line change
@@ -5,11 +5,13 @@
package io.airbyte.integrations.destination.s3_glue;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
import io.airbyte.commons.functional.CheckedBiConsumer;
import io.airbyte.commons.functional.CheckedBiFunction;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer;
import io.airbyte.integrations.destination.buffered_stream_consumer.OnCloseFunction;
@@ -85,6 +87,8 @@ private static Function<ConfiguredAirbyteStream, S3GlueWriteConfig> toWriteConfi
final String fullOutputPath = storageOperations.getBucketObjectPath(namespace, streamName, SYNC_DATETIME, customOutputFormat);
final DestinationSyncMode syncMode = stream.getDestinationSyncMode();
final JsonNode jsonSchema = abStream.getJsonSchema();
((ObjectNode)jsonSchema.get("properties")).putPOJO(JavaBaseConstants.COLUMN_NAME_AB_ID, Map.of("type", "string"));
((ObjectNode)jsonSchema.get("properties")).putPOJO(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, Map.of("type", "string"));
final String location = "s3://" + s3Config.getBucketName() + "/" +
fullOutputPath.substring(0, fullOutputPath.lastIndexOf("/") + 1);
final S3GlueWriteConfig writeConfig =
Original file line number Diff line number Diff line change
@@ -20,6 +20,8 @@
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import java.util.function.Consumer;

import org.apache.commons.lang3.RandomStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -43,7 +45,9 @@ public AirbyteConnectionStatus check(JsonNode config) {
}
final GlueDestinationConfig glueConfig = GlueDestinationConfig.getInstance(config);
MetastoreOperations metastoreOperations = null;
String tableName = "test_table";
// If there are multiple syncs started at the same time a stataic test table name causes a resource collision and a failure to sync.
String tableSuffix = RandomStringUtils.randomAlphabetic(9);
String tableName = "test_table_" + tableSuffix;
try {
metastoreOperations = new GlueOperations(glueConfig.getAWSGlueInstance());
metastoreOperations.upsertTable(glueConfig.getDatabase(), tableName, "s3://", Jsons.emptyObject(), glueConfig.getSerializationLibrary());
9 changes: 5 additions & 4 deletions docs/integrations/destinations/s3-glue.md
Original file line number Diff line number Diff line change
@@ -243,7 +243,8 @@ Output files can be compressed. The default option is GZIP compression. If compr

## CHANGELOG

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :------------- |
| 0.1.1 | 2022-12-13 | [19907](https://github.com/airbytehq/airbyte/pull/19907) | Fix parsing empty object in schema |
| 0.1.0 | 2022-11-17 | [18695](https://github.com/airbytehq/airbyte/pull/18695) | Initial Commit |
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------|
| 0.1.2 | 2023-02-01 | [22220](https://github.com/airbytehq/airbyte/pull/22220) | Fix race condition in test, table metadata, add Airbyte sync fields to table definition |
| 0.1.1 | 2022-12-13 | [19907](https://github.com/airbytehq/airbyte/pull/19907) | Fix parsing empty object in schema |
| 0.1.0 | 2022-11-17 | [18695](https://github.com/airbytehq/airbyte/pull/18695) | Initial Commit |

0 comments on commit 97f6a97

Please sign in to comment.