diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index b05591295659..cf7d16903dc6 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -348,7 +348,7 @@ - name: Snowflake destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba dockerRepository: airbyte/destination-snowflake - dockerImageTag: 0.4.44 + dockerImageTag: 0.4.45 documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake icon: snowflake.svg normalizationConfig: diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 7f34e408d5b9..dac22b69043c 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -6109,7 +6109,7 @@ supported_destination_sync_modes: - "overwrite" - "append" -- dockerImage: "airbyte/destination-snowflake:0.4.44" +- dockerImage: "airbyte/destination-snowflake:0.4.45" spec: documentationUrl: "https://docs.airbyte.com/integrations/destinations/snowflake" connectionSpecification: diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcSqlOperations.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcSqlOperations.java index 2ba0530f51cb..627e0def2f44 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcSqlOperations.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcSqlOperations.java @@ -5,6 +5,7 @@ package io.airbyte.integrations.destination.jdbc; import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.json.Jsons; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.base.JavaBaseConstants; @@ -43,12 +44,27 @@ protected JdbcSqlOperations(final DataAdapter dataAdapter) { @Override public void createSchemaIfNotExists(final JdbcDatabase database, final String schemaName) throws Exception { - if (!schemaSet.contains(schemaName) && !isSchemaExists(database, schemaName)) { - database.execute(String.format("CREATE SCHEMA IF NOT EXISTS %s;", schemaName)); - schemaSet.add(schemaName); + try { + if (!schemaSet.contains(schemaName) && !isSchemaExists(database, schemaName)) { + database.execute(String.format("CREATE SCHEMA IF NOT EXISTS %s;", schemaName)); + schemaSet.add(schemaName); + } + } catch (Exception e) { + throw checkForKnownConfigExceptions(e).orElseThrow(() -> e); } } + /** + * When an exception occurs, we may recognize it as an issue with the users permissions + * or other configuration options. In these cases, we can wrap the exception in a {@link ConfigErrorException} + * which will exclude the error from our on-call paging/reporting + * @param e the exception to check. + * @return A ConfigErrorException with a message with actionable feedback to the user. + */ + protected Optional checkForKnownConfigExceptions(Exception e) { + return Optional.empty(); + } + @Override public void createTableIfNotExists(final JdbcDatabase database, final String schemaName, final String tableName) throws SQLException { database.execute(createTableQuery(database, schemaName, tableName)); diff --git a/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/jdbc/TestJdbcSqlOperations.java b/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/jdbc/TestJdbcSqlOperations.java index c6dd736a3b81..d573aa6a46fa 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/jdbc/TestJdbcSqlOperations.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/jdbc/TestJdbcSqlOperations.java @@ -6,7 +6,12 @@ import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.protocol.models.v0.AirbyteRecordMessage; +import java.sql.SQLException; import java.util.List; +import org.junit.Assert; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; public class TestJdbcSqlOperations extends JdbcSqlOperations { @@ -19,4 +24,18 @@ public void insertRecordsInternal(final JdbcDatabase database, // Not required for the testing } + @Test + public void testCreateSchemaIfNotExists() { + final JdbcDatabase db = Mockito.mock(JdbcDatabase.class); + final var schemaName = "foo"; + try { + Mockito.doThrow(new SQLException("TEST")).when(db).execute(Mockito.anyString()); + } catch (Exception e) { + // This would not be expected, but the `execute` method above will flag as an unhandled exception + assert false; + } + SQLException exception = Assertions.assertThrows(SQLException.class, () -> createSchemaIfNotExists(db, schemaName)); + Assertions.assertEquals(exception.getMessage(), "TEST"); + } + } diff --git a/airbyte-integrations/connectors/destination-snowflake/Dockerfile b/airbyte-integrations/connectors/destination-snowflake/Dockerfile index da6515de1feb..28c1fe2763cd 100644 --- a/airbyte-integrations/connectors/destination-snowflake/Dockerfile +++ b/airbyte-integrations/connectors/destination-snowflake/Dockerfile @@ -20,5 +20,5 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1 ENV ENABLE_SENTRY true -LABEL io.airbyte.version=0.4.44 +LABEL io.airbyte.version=0.4.45 LABEL io.airbyte.name=airbyte/destination-snowflake diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java index 5675dcd40367..177227c3750d 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java @@ -5,6 +5,7 @@ package io.airbyte.integrations.destination.snowflake; import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.destination.jdbc.JdbcSqlOperations; @@ -13,8 +14,10 @@ import io.airbyte.protocol.models.v0.AirbyteRecordMessage; import java.sql.SQLException; import java.util.List; +import java.util.Optional; import java.util.StringJoiner; import java.util.stream.Stream; +import net.snowflake.client.jdbc.SnowflakeSQLException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,6 +26,10 @@ class SnowflakeSqlOperations extends JdbcSqlOperations implements SqlOperations private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeSqlOperations.class); private static final int MAX_FILES_IN_LOADING_QUERY_LIMIT = 1000; + // This is an unfortunately fragile way to capture this, but Snowflake doesn't + // provide a more specific permission exception error code + private static final String NO_PRIVILEGES_ERROR_MESSAGE = "but current role has no privileges on it"; + @Override public String createTableQuery(final JdbcDatabase database, final String schemaName, final String tableName) { return String.format( @@ -73,4 +80,12 @@ protected String generateFilesList(final List files) { } } + @Override + protected Optional checkForKnownConfigExceptions(Exception e) { + if (e instanceof SnowflakeSQLException && e.getMessage().contains(NO_PRIVILEGES_ERROR_MESSAGE)) { + return Optional.of(new ConfigErrorException("Encountered Error with Snowflake Configuration: Current role does not have permissions on the target schema please verify your privileges", e)); + } + return Optional.empty(); + } + } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertDestinationAcceptanceTest.java index cd38967a2578..b0f766fcad16 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertDestinationAcceptanceTest.java @@ -47,7 +47,7 @@ public class SnowflakeInsertDestinationAcceptanceTest extends DestinationAccepta "No active warehouse selected in the current session. Select an active warehouse with the 'use warehouse' command."; protected static final String NO_USER_PRIVILEGES_ERR_MSG = - "Schema 'TEXT_SCHEMA' already exists, but current role has no privileges on it."; + "Encountered Error with Snowflake Configuration: Current role does not have permissions on the target schema please verify your privileges"; // this config is based on the static config, and it contains a random // schema name that is different for each test run diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperationsTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperationsTest.java index dfaf972e0de5..b52218e31354 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperationsTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperationsTest.java @@ -11,6 +11,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.base.JavaBaseConstants; @@ -18,7 +19,12 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.List; +import net.snowflake.client.jdbc.SnowflakeSQLException; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.mockito.Mockito; class SnowflakeSqlOperationsTest { @@ -53,4 +59,24 @@ void insertRecordsInternal() throws SQLException { verify(db, times(1)).execute(any(CheckedConsumer.class)); } + @ParameterizedTest + @CsvSource({"TEST,false", "but current role has no privileges on it,true"}) + public void testCreateSchemaIfNotExists(final String message, final boolean shouldCapture) { + final JdbcDatabase db = Mockito.mock(JdbcDatabase.class); + final var schemaName = "foo"; + try { + Mockito.doThrow(new SnowflakeSQLException(message)).when(db).execute(Mockito.anyString()); + } catch (Exception e) { + // This would not be expected, but the `execute` method above will flag as an unhandled exception + assert false; + } + Exception exception = Assertions.assertThrows(Exception.class, () -> snowflakeSqlOperations.createSchemaIfNotExists(db, schemaName)); + if (shouldCapture) { + Assertions.assertInstanceOf(ConfigErrorException.class, exception); + } else { + Assertions.assertInstanceOf(SnowflakeSQLException.class, exception); + Assertions.assertEquals(exception.getMessage(), message); + } + } + } diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index 1bbadee5053a..4922baae8a19 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -272,12 +272,23 @@ Now that you have set up the Snowflake destination connector, check out the foll - [Migrate your data from Redshift to Snowflake](https://airbyte.com/tutorials/redshift-to-snowflake) - [Orchestrate ELT pipelines with Prefect, Airbyte and dbt](https://airbyte.com/tutorials/elt-pipeline-prefect-airbyte-dbt) +## Troubleshooting + +### 'Current role does not have permissions on the target schema' +If you receive an error stating `Current role does not have permissions on the target schema` make sure that the +Snowflake destination `SCHEMA` is one that the role you've provided has permissions on. When creating a connection, +it may allow you to select `Mirror source structure` for the `Destination namespace`, which if you have followed +some of our default examples and tutorials may result in the connection trying to write to a `PUBLIC` schema. + +A quick fix could be to edit your connection's 'Replication' settings from `Mirror source structure` to `Destination Default`. +Otherwise, make sure to grant the role the required permissions in the desired namespace. ## Changelog | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------| -| 0.4.44 | 2023-01-20 | [#21087](https://github.com/airbytehq/airbyte/pull/21087) | Wrap Authentication Errors as Config Exceptions | +| 0.4.45 | 2023-01-25 | [#21087](https://github.com/airbytehq/airbyte/pull/21764) | Catch Known Permissions and rethrow as ConfigExceptions | +| 0.4.44 | 2023-01-20 | [#21087](https://github.com/airbytehq/airbyte/pull/21087) | Wrap Authentication Errors as Config Exceptions | | 0.4.43 | 2023-01-20 | [\#21450](https://github.com/airbytehq/airbyte/pull/21450) | Updated Check methods to handle more possible s3 and gcs stagings issues | | 0.4.42 | 2023-01-12 | [\#21342](https://github.com/airbytehq/airbyte/pull/21342) | Better handling for conflicting destination streams | | 0.4.41 | 2022-12-16 | [\#20566](https://github.com/airbytehq/airbyte/pull/20566) | Improve spec to adhere to standards |