Skip to content

Commit

Permalink
🐛Destination-Snowflake: updated check method to handle more possible …
Browse files Browse the repository at this point in the history
…s3 and gcs stagings issues (#21450)

* [18312] Destination-Snowflake: updated check method to handle more possible s3 and gcs stagings issues
  • Loading branch information
etsybaev authored Jan 20, 2023
1 parent 5257f69 commit 2d65d62
Show file tree
Hide file tree
Showing 12 changed files with 115 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@
- name: Snowflake
destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.4.42
dockerImageTag: 0.4.43
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
icon: snowflake.svg
normalizationConfig:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6109,7 +6109,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-snowflake:0.4.42"
- dockerImage: "airbyte/destination-snowflake:0.4.43"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/snowflake"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ public class CsvSerializedBuffer extends BaseSerializedBuffer {
private CSVPrinter csvPrinter;
private CSVFormat csvFormat;

protected CsvSerializedBuffer(final BufferStorage bufferStorage,
final CsvSheetGenerator csvSheetGenerator,
final boolean compression)
public CsvSerializedBuffer(final BufferStorage bufferStorage,
final CsvSheetGenerator csvSheetGenerator,
final boolean compression)
throws Exception {
super(bufferStorage);
this.csvSheetGenerator = csvSheetGenerator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ protected void performCreateInsertTestOnDestination(final String outputSchema,
final JdbcDatabase database,
final NamingConventionTransformer nameTransformer)
throws Exception {
AbstractJdbcDestination.attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, getSqlOperations());
AbstractJdbcDestination.attemptTableOperations(outputSchema, database, nameTransformer, getSqlOperations(), true);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1

ENV ENABLE_SENTRY true

LABEL io.airbyte.version=0.4.42
LABEL io.airbyte.version=0.4.43
LABEL io.airbyte.name=airbyte/destination-snowflake
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
package io.airbyte.integrations.destination.snowflake;

import static io.airbyte.integrations.destination.snowflake.SnowflakeS3StagingDestination.isPurgeStagingData;
import static java.nio.charset.StandardCharsets.UTF_8;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
Expand All @@ -29,6 +31,7 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
Expand Down Expand Up @@ -83,12 +86,19 @@ public AirbyteConnectionStatus check(final JsonNode config) {
}

private static void attemptWriteAndDeleteGcsObject(final GcsConfig gcsConfig, final String outputTableName) throws IOException {
final var storage = getStorageClient(gcsConfig);
final var blobId = BlobId.of(gcsConfig.getBucketName(), "check-content/" + outputTableName);
final var blobInfo = BlobInfo.newBuilder(blobId).build();
final Storage storageClient = getStorageClient(gcsConfig);
final BlobId blobId = BlobId.of(gcsConfig.getBucketName(), "check-content/" + outputTableName);
final BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();

storage.create(blobInfo, "".getBytes(StandardCharsets.UTF_8));
storage.delete(blobId);
storageClient.create(blobInfo);

try (WriteChannel writer = storageClient.writer(blobInfo)) {
// Try to write a dummy message to make sure user has all required permissions
final byte[] content = "Hello, World!".getBytes(UTF_8);
writer.write(ByteBuffer.wrap(content, 0, content.length));
} finally {
storageClient.delete(blobId);
}
}

public static Storage getStorageClient(final GcsConfig gcsConfig) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ public void createStageIfNotExists(final JdbcDatabase database, final String sta
}

/**
* Creates a SQL query to create a staging folder. This query will create a staging folder if one previously did not exist
* Creates a SQL query to create a staging folder. This query will create a staging folder if one
* previously did not exist
*
* @param stageName name of the staging folder
* @return SQL query string
Expand All @@ -157,8 +158,8 @@ public void copyIntoTmpTableFromStage(final JdbcDatabase database,
}

/**
* Creates a SQL query to bulk copy data into fully qualified destination table
* See https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html for more context
* Creates a SQL query to bulk copy data into fully qualified destination table See
* https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html for more context
*
* @param stageName name of staging folder
* @param stagingPath path of staging folder to data files
Expand Down Expand Up @@ -200,8 +201,8 @@ public void cleanUpStage(final JdbcDatabase database, final String stageName, fi
}

/**
* Creates a SQL query used to remove staging files that were just staged
* See https://docs.snowflake.com/en/sql-reference/sql/remove.html for more context
* Creates a SQL query used to remove staging files that were just staged See
* https://docs.snowflake.com/en/sql-reference/sql/remove.html for more context
*
* @param stageName name of staging folder
* @return SQL query string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,18 @@
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.integrations.destination.record_buffer.FileBuffer;
import io.airbyte.integrations.destination.record_buffer.InMemoryBuffer;
import io.airbyte.integrations.destination.s3.AesCbcEnvelopeEncryption;
import io.airbyte.integrations.destination.s3.AesCbcEnvelopeEncryption.KeyType;
import io.airbyte.integrations.destination.s3.EncryptionConfig;
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
import io.airbyte.integrations.destination.s3.csv.CsvSerializedBuffer;
import io.airbyte.integrations.destination.s3.csv.StagingDatabaseCsvSheetGenerator;
import io.airbyte.integrations.destination.staging.StagingConsumerFactory;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import java.util.Collections;
import java.util.Map;
Expand Down Expand Up @@ -90,7 +93,26 @@ private static void attemptStageOperations(final String outputSchema,
final String outputTableName = namingResolver.getIdentifier("_airbyte_connection_test_" + UUID.randomUUID());
final String stageName = sqlOperations.getStageName(outputSchema, outputTableName);
sqlOperations.createStageIfNotExists(database, stageName);
sqlOperations.dropStageIfExists(database, stageName);

// try to make test write to make sure we have required role
try {
final CsvSerializedBuffer csvSerializedBuffer = new CsvSerializedBuffer(
new InMemoryBuffer(".csv"),
new StagingDatabaseCsvSheetGenerator(),
true);

// create a dummy stream\records that will bed used to test uploading
csvSerializedBuffer.accept(new AirbyteRecordMessage()
.withData(Jsons.jsonNode(Map.of("testKey", "testValue")))
.withEmittedAt(System.currentTimeMillis()));
csvSerializedBuffer.flush();

sqlOperations.uploadRecordsToStage(database, csvSerializedBuffer, outputSchema, stageName,
stageName.endsWith("/") ? stageName : stageName + "/");
} finally {
// drop created tmp stage
sqlOperations.dropStageIfExists(database, stageName);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,23 @@

package io.airbyte.integrations.destination.snowflake;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.config.StandardCheckConnectionOutput.Status;
import java.nio.file.Path;
import org.junit.jupiter.api.Test;

public class SnowflakeGcsCopyDestinationAcceptanceTest extends SnowflakeInsertDestinationAcceptanceTest {

private static final String NO_GCS_PRIVILEGES_ERR_MSG =
"Permission 'storage.objects.create' denied on resource (or it may not exist).";

@Override
public JsonNode getStaticConfig() {
final JsonNode copyConfig = Jsons.deserialize(IOs.readFile(Path.of("secrets/copy_gcs_config.json")));
Expand All @@ -20,4 +29,16 @@ public JsonNode getStaticConfig() {
return copyConfig;
}

@Test
public void testCheckWithNoProperGcsPermissionConnection() throws Exception {
// Config to user (creds) that has no permission to schema
final JsonNode config = Jsons.deserialize(IOs.readFile(
Path.of("secrets/copy_insufficient_gcs_roles_config.json")));

StandardCheckConnectionOutput standardCheckConnectionOutput = runCheck(config);

assertEquals(Status.FAILED, standardCheckConnectionOutput.getStatus());
assertThat(standardCheckConnectionOutput.getMessage()).contains(NO_GCS_PRIVILEGES_ERR_MSG);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,22 @@

package io.airbyte.integrations.destination.snowflake;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.config.StandardCheckConnectionOutput.Status;
import java.nio.file.Path;
import org.junit.jupiter.api.Test;

public class SnowflakeS3CopyEncryptedDestinationAcceptanceTest extends SnowflakeInsertDestinationAcceptanceTest {

private static final String NO_S3_PRIVILEGES_ERR_MSG = "Could not connect with provided configuration.";

@Override
public JsonNode getStaticConfig() {
final JsonNode copyConfig = Jsons.deserialize(IOs.readFile(Path.of("secrets/copy_s3_encrypted_config.json")));
Expand All @@ -20,4 +28,16 @@ public JsonNode getStaticConfig() {
return copyConfig;
}

@Test
public void testCheckWithNoProperS3PermissionConnection() throws Exception {
// Config to user (creds) that has no permission to schema
final JsonNode config = Jsons.deserialize(IOs.readFile(
Path.of("secrets/copy_s3_wrong_location_config.json")));

StandardCheckConnectionOutput standardCheckConnectionOutput = runCheck(config);

assertEquals(Status.FAILED, standardCheckConnectionOutput.getStatus());
assertThat(standardCheckConnectionOutput.getMessage()).contains(NO_S3_PRIVILEGES_ERR_MSG);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

package io.airbyte.integrations.destination.snowflake;

import static io.airbyte.db.jdbc.DateTimeConverter.putJavaSQLDate;
import static io.airbyte.db.jdbc.DateTimeConverter.putJavaSQLTime;

import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.db.jdbc.JdbcSourceOperations;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTestUtils;
Expand All @@ -17,4 +20,22 @@ protected void putString(ObjectNode node, String columnName, ResultSet resultSet
DestinationAcceptanceTestUtils.putStringIntoJson(resultSet.getString(index), columnName, node);
}

@Override
protected void putDate(final ObjectNode node,
final String columnName,
final ResultSet resultSet,
final int index)
throws SQLException {
putJavaSQLDate(node, columnName, resultSet, index);
}

@Override
protected void putTime(final ObjectNode node,
final String columnName,
final ResultSet resultSet,
final int index)
throws SQLException {
putJavaSQLTime(node, columnName, resultSet, index);
}

}
3 changes: 2 additions & 1 deletion docs/integrations/destinations/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,8 @@ Now that you have set up the Snowflake destination connector, check out the foll
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.4.41 | 2023-01-12 | [\#21342](https://github.com/airbytehq/airbyte/pull/21342) | Better handling for conflicting destination streams |
| 0.4.43 | 2023-01-20 | [\#21450](https://github.com/airbytehq/airbyte/pull/21450) | Updated Check methods to handle more possible s3 and gcs stagings issues |
| 0.4.42 | 2023-01-12 | [\#21342](https://github.com/airbytehq/airbyte/pull/21342) | Better handling for conflicting destination streams |
| 0.4.41 | 2022-12-16 | [\#20566](https://github.com/airbytehq/airbyte/pull/20566) | Improve spec to adhere to standards |
| 0.4.40 | 2022-11-11 | [\#19302](https://github.com/airbytehq/airbyte/pull/19302) | Set jdbc application env variable depends on env - airbyte_oss or airbyte_cloud |
| 0.4.39 | 2022-11-09 | [\#18970](https://github.com/airbytehq/airbyte/pull/18970) | Updated "check" connection method to handle more errors |
Expand Down

0 comments on commit 2d65d62

Please sign in to comment.