diff --git a/CHANGELOG.md b/CHANGELOG.md
index f8b4a5d5a..5c64418dd 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,18 +1,20 @@
 ## dbt-snowflake 1.0.0 (Release TBD)
 
 ### Features
-N/A
+- Adds option to enable retries on errors encountered by the Snowflake connector ([#14](https://github.com/dbt-labs/dbt-snowflake/issues/14))
 
 ### Fixes
 - When on_schema_change is set, pass common columns as dest_columns in incremental merge macros ([#4144](https://github.com/dbt-labs/dbt-core/issues/4144))
 
 ### Under the hood
 - Add optional profile parameters for atypical local connection setups ([#21](https://github.com/dbt-labs/dbt-snowflake/issues/21), [#36](https://github.com/dbt-labs/dbt-snowflake/pull/36))
+- Adds 4 optional profile parameters for configuring retries on Snowflake errors ([#14](https://github.com/dbt-labs/dbt-snowflake/issues/14), [#6](https://github.com/dbt-labs/dbt-snowflake/pull/6)) 
 - Bump upper bound on `snowflake-connector-python` to `<2.8.0` ([#44](https://github.com/dbt-labs/dbt-snowflake/pull/44))
 - Remove official support for python 3.6, which is reaching end of life on December 23, 2021 ([dbt-core#4134](https://github.com/dbt-labs/dbt-core/issues/4134), [#38](https://github.com/dbt-labs/dbt-snowflake/pull/45))
 
 ### Contributors
 - [@laxjesse](https://github.com/laxjesse) ([#36](https://github.com/dbt-labs/dbt-snowflake/pull/36))
+- [@mhmcdonald](https://github.com/mhmcdonald) ([#6](https://github.com/dbt-labs/dbt-snowflake/pull/6))
 
 ## dbt-snowflake v1.0.0b2 (October 25, 2021)
 
diff --git a/dbt/adapters/snowflake/connections.py b/dbt/adapters/snowflake/connections.py
index fd7c5e637..78f66cc76 100644
--- a/dbt/adapters/snowflake/connections.py
+++ b/dbt/adapters/snowflake/connections.py
@@ -47,6 +47,10 @@ class SnowflakeCredentials(Credentials):
     proxy_host: Optional[str] = None
     proxy_port: Optional[int] = None
     protocol: Optional[str] = None
+    connect_retries: int = 0
+    connect_timeout: int = 10
+    retry_on_database_errors: bool = False
+    retry_all: bool = False
 
     def __post_init__(self):
         if (
@@ -233,38 +237,93 @@ def open(cls, connection):
             logger.debug('Connection is already open, skipping open.')
             return connection
 
-        try:
-            creds = connection.credentials
-
-            handle = snowflake.connector.connect(
-                account=creds.account,
-                user=creds.user,
-                database=creds.database,
-                schema=creds.schema,
-                warehouse=creds.warehouse,
-                role=creds.role,
-                autocommit=True,
-                client_session_keep_alive=creds.client_session_keep_alive,
-                application='dbt',
-                **creds.auth_args()
-            )
+        creds = connection.credentials
+        error = None
+        for attempt in range(1 + creds.connect_retries):
+            try:
+                handle = snowflake.connector.connect(
+                    account=creds.account,
+                    user=creds.user,
+                    database=creds.database,
+                    schema=creds.schema,
+                    warehouse=creds.warehouse,
+                    role=creds.role,
+                    autocommit=True,
+                    client_session_keep_alive=creds.client_session_keep_alive,
+                    application='dbt',
+                    **creds.auth_args()
+                )
+
+                if creds.query_tag:
+                    handle.cursor().execute(
+                        ("alter session set query_tag = '{}'")
+                        .format(creds.query_tag))
+
+                connection.handle = handle
+                connection.state = 'open'
+                break
 
-            if creds.query_tag:
-                handle.cursor().execute(
-                    ("alter session set query_tag = '{}'")
-                    .format(creds.query_tag))
+            except snowflake.connector.errors.DatabaseError as e:
+                if (creds.retry_on_database_errors or creds.retry_all) \
+                        and creds.connect_retries > 0:
+                    error = e
+                    logger.warning("Got an error when attempting to open a "
+                                   "snowflake connection. Retrying due to "
+                                   "either retry configuration set to true."
+                                   "This was attempt number: {attempt} of "
+                                   "{retry_limit}. "
+                                   "Retrying in {timeout} "
+                                   "seconds. Error: '{error}'"
+                                   .format(attempt=attempt,
+                                           retry_limit=creds.connect_retries,
+                                           timeout=creds.connect_timeout,
+                                           error=e))
+                    sleep(creds.connect_timeout)
+                else:
+                    logger.debug("Got an error when attempting to open a "
+                                 "snowflake connection. No retries "
+                                 "attempted: '{}'"
+                                 .format(e))
+
+                    connection.handle = None
+                    connection.state = 'fail'
+
+                    raise FailedToConnectException(str(e))
+
+            except snowflake.connector.errors.Error as e:
+                if creds.retry_all and creds.connect_retries > 0:
+                    error = e
+                    logger.warning("Got an error when attempting to open a "
+                                   "snowflake connection. Retrying due to "
+                                   "'retry_all' configuration set to true."
+                                   "This was attempt number: {attempt} of "
+                                   "{retry_limit}. "
+                                   "Retrying in {timeout} "
+                                   "seconds. Error: '{error}'"
+                                   .format(attempt=attempt,
+                                           retry_limit=creds.connect_retries,
+                                           timeout=creds.connect_timeout,
+                                           error=e))
+                    sleep(creds.connect_timeout)
+                else:
+                    logger.debug("Got an error when attempting to open a "
+                                 "snowflake connection. No retries "
+                                 "attempted: '{}'"
+                                 .format(e))
+
+                    connection.handle = None
+                    connection.state = 'fail'
+
+                    raise FailedToConnectException(str(e))
 
-            connection.handle = handle
-            connection.state = 'open'
-        except snowflake.connector.errors.Error as e:
+        else:
             logger.debug("Got an error when attempting to open a snowflake "
                          "connection: '{}'"
-                         .format(e))
+                         .format(error))
 
             connection.handle = None
             connection.state = 'fail'
-
-            raise FailedToConnectException(str(e))
+            raise FailedToConnectException(str(error))
 
     def cancel(self, connection):
         handle = connection.handle