Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for "table dropped by concurrent query" on Redshift #825

Merged
merged 3 commits into from
Jul 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ test-unit:

test-integration:
@echo "Integration test run starting..."
@time docker-compose run test tox -e integration-postgres-py27,integration-postgres-py36,integration-snowflake-py27,integration-snowflake-py36,integration-bigquery-py27,integration-bigquery-py36
@time docker-compose run test tox -e integration-postgres-py27,integration-postgres-py36,integration-redshift-py27,integration-redshift-py36,integration-snowflake-py27,integration-snowflake-py36,integration-bigquery-py27,integration-bigquery-py36

test-quick:
@echo "Integration test run starting..."
Expand Down
3 changes: 2 additions & 1 deletion dbt/adapters/default/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ def drop_relation(cls, profile, project_cfg, relation, model_name=None):

sql = 'drop {} if exists {} cascade'.format(relation.type, relation)

connection, cursor = cls.add_query(profile, sql, model_name)
connection, cursor = cls.add_query(profile, sql, model_name,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to make this change because dbt was previously adding a begin statement before the drop. Since the drop now happens after the transaction, there was no corresponding commit, and the transaction containing the drop was rolled back.

I checked the other places where dbt uses this method, and I believe this change should be safe, but I'm very open to improving how dbt handles transactions here

auto_begin=False)

@classmethod
def truncate(cls, profile, project_cfg, schema, table, model_name=None):
Expand Down
15 changes: 12 additions & 3 deletions dbt/include/global_project/macros/materializations/table/table.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{% materialization table, default %}
{%- set identifier = model['alias'] -%}
{%- set tmp_identifier = identifier + '__dbt_tmp' -%}
{%- set backup_identifier = identifier + '__dbt_backup' -%}
{%- set non_destructive_mode = (flags.NON_DESTRUCTIVE == True) -%}

{%- set existing_relations = adapter.list_relations(schema=schema) -%}
Expand All @@ -10,15 +11,18 @@
schema=schema, type='table') -%}
{%- set intermediate_relation = api.Relation.create(identifier=tmp_identifier,
schema=schema, type='table') -%}
{%- set backup_relation = api.Relation.create(identifier=backup_identifier,
schema=schema, type='table') -%}
{%- set exists_as_table = (old_relation is not none and old_relation.is_table) -%}
{%- set exists_as_view = (old_relation is not none and old_relation.is_view) -%}
{%- set create_as_temporary = (exists_as_table and non_destructive_mode) -%}


-- drop the temp relation if it exists for some reason
-- drop the temp relations if they exists for some reason
{{ adapter.drop_relation(intermediate_relation) }}
{{ adapter.drop_relation(backup_relation) }}

-- setup: if the target relation already exists, truncate or drop it
-- setup: if the target relation already exists, truncate or drop it (if it's a view)
{% if non_destructive_mode -%}
{% if exists_as_table -%}
{{ adapter.truncate_relation(old_relation) }}
Expand Down Expand Up @@ -60,12 +64,17 @@
{% if non_destructive_mode -%}
-- noop
{%- else -%}
{{ drop_relation_if_exists(old_relation) }}
{% if exists_as_table %}
-- move the existing table out of the way
{{ adapter.rename_relation(target_relation, backup_relation) }}
{% endif %}

{{ adapter.rename_relation(intermediate_relation, target_relation) }}
{%- endif %}

-- `COMMIT` happens here
{{ adapter.commit() }}
{{ drop_relation_if_exists(backup_relation) }}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would this logic fail if the backup relation wasn't deleted on the previous run? i.e.:

run 1:

  • dbt renames target to backup, commit succeeds, but then it fails to drop backup here
    run 2:
  • ERROR: relation already exists?

i'd imagine you could fix this by also dropping the backup relation at the start of the run, if it exists.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

really good point. We do something similar for the __dbt_tmp relation -- i will update!


{{ run_hooks(post_hooks, inside_transaction=False) }}
{% endmaterialization %}
12 changes: 11 additions & 1 deletion dbt/include/global_project/macros/materializations/view/view.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

{%- set identifier = model['alias'] -%}
{%- set tmp_identifier = identifier + '__dbt_tmp' -%}
{%- set backup_identifier = identifier + '__dbt_backup' -%}
{%- set non_destructive_mode = (flags.NON_DESTRUCTIVE == True) -%}

{%- set existing_relations = adapter.list_relations(schema=schema) -%}
Expand All @@ -11,14 +12,19 @@
type='view') -%}
{%- set intermediate_relation = api.Relation.create(identifier=tmp_identifier,
schema=schema, type='view') -%}
{%- set backup_relation = api.Relation.create(identifier=backup_identifier,
schema=schema, type='view') -%}

{%- set exists_as_view = (old_relation is not none and old_relation.is_view) -%}

{%- set has_transactional_hooks = (hooks | selectattr('transaction', 'equalto', True) | list | length) > 0 %}
{%- set should_ignore = non_destructive_mode and exists_as_view %}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- drop the temp relations if they exists for some reason
{{ adapter.drop_relation(intermediate_relation) }}
{{ adapter.drop_relation(backup_relation) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}
Expand All @@ -45,7 +51,10 @@

-- cleanup
{% if not should_ignore -%}
{{ drop_relation_if_exists(old_relation) }}
-- move the existing view out of the way
{% if exists_as_view %}
{{ adapter.rename_relation(target_relation, backup_relation) }}
{% endif %}
{{ adapter.rename_relation(intermediate_relation, target_relation) }}
{%- endif %}

Expand All @@ -55,6 +64,7 @@
#}
{% if has_transactional_hooks or not should_ignore %}
{{ adapter.commit() }}
{{ drop_relation_if_exists(backup_relation) }}
{% endif %}

{{ run_hooks(post_hooks, inside_transaction=False) }}
Expand Down
6 changes: 6 additions & 0 deletions test.env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,9 @@ BIGQUERY_AUTH_URI=
BIGQUERY_TOKEN_URI=
BIGQUERY_AUTH_PROVIDER_X509_CERT_URL=
BIGQUERY_CLIENT_X509_CERT_URL=

REDSHIFT_TEST_HOST=
REDSHIFT_TEST_USER=
REDSHIFT_TEST_PASSWORD=
REDSHIFT_TEST_PORT=
REDSHIFT_TEST_DBNAME=
33 changes: 33 additions & 0 deletions test/integration/032_concurrent_transaction_test/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@

This test warrants some explanation. In dbt <=0.10.1, Redshift table and view materializations suffered from issues around concurrent transactions. In order to reliably reproduce this error, a query needs to select from a dbt model as the table is being rebuilt. Critically, this concurrent select needs to query the table during the drop/swap portition of the materialization. This looks like:

```sql
begin;
create table as (...);
drop table old_table cascade;
// <---- The concurrent query needs to be running here!
alter table new_table rename to old_table;
commit;
```

In order to reliably reproduce this failure, the model shown above needs to block for a long time between the `drop` and `alter` statements. We can't just stick a sleep() call in there, as this code is defined in the materialization. Instead, we can reliably reproduce the failure by:

1) creating a view that depends on this model
2) issuing a long-running query on the view before `dbt run` is invoked
3) issuing _another_ long-running query against the original model

Since long-running query (step 2) is selecting from the view, Redshift blocks on the `drop ... cascade`, of the materialization, which causes the query from step 3 time to overlap with the critical section of the materialization between the `drop` and `alter` statements.

In dbt v0.10.1, this integration test results in:

```
======================================================================
FAIL: test__redshift__concurrent_transaction (test_concurrent_transaction.TestConcurrentTransaction)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/usr/src/app/test/integration/032_concurrent_transaction_test/test_concurrent_transaction.py", line 84, in test__redshift__concurrent_transaction
self.assertEqual(self.query_state['model_1'], 'good')
AssertionError: 'error: table 3379442 dropped by concurrent transaction\n' != 'good'
- error: table 3379442 dropped by concurrent transaction
+ good
```
13 changes: 13 additions & 0 deletions test/integration/032_concurrent_transaction_test/macros/udfs.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@

{% macro create_udfs() %}

CREATE OR REPLACE FUNCTION {{ target.schema }}.f_sleep (x float)
RETURNS bool IMMUTABLE
AS
$$
from time import sleep
sleep(x)
return True
$$ LANGUAGE plpythonu;

{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@

{{ config(materialized='incremental', sql_where=True, unique_key='id') }}

-- incremental model
select 1 as id
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@


select * from {{ ref('model_1') }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@

{{ config(materialized='table') }}

-- table model
select 1 as id
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@


select * from {{ ref('model_1') }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@

{{ config(materialized='view') }}

-- view model
select 1 as id
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@


select * from {{ ref('model_1') }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from nose.plugins.attrib import attr
from test.integration.base import DBTIntegrationTest
import threading

class BaseTestConcurrentTransaction(DBTIntegrationTest):

def reset(self):
self.query_state = {
'view_model': 'wait',
'model_1': 'wait',
}

@property
def schema(self):
return "concurrent_transaction_032"

@property
def project_config(self):
return {
"macro-paths": ["test/integration/032_concurrent_transaction_test/macros"],
"on-run-start": [
"{{ create_udfs() }}",
],
}

def run_select_and_check(self, rel, sql):
try:
res = self.run_sql(sql, fetch='one')

# The result is the output of f_sleep(), which is True
if res[0] == True:
self.query_state[rel] = 'good'
else:
self.query_state[rel] = 'bad'

except Exception as e:
if 'concurrent transaction' in str(e):
self.query_state[rel] = 'error: {}'.format(e)
else:
self.query_state[rel] = 'error: {}'.format(e)

def async_select(self, rel, sleep=10):
# Run the select statement in a thread. When the query returns, the global
# query_state will be update with a state of good/bad/error, and the associated
# error will be reported if one was raised.

schema = self.unique_schema()
query = '''
-- async_select: {rel}
select {schema}.f_sleep({sleep}) from {schema}.{rel}
'''.format(
schema=schema,
sleep=sleep,
rel=rel)

thread = threading.Thread(target=lambda: self.run_select_and_check(rel, query))
thread.start()
return thread

def run_test(self):
self.use_profile("redshift")

# First run the project to make sure the models exist
self.run_dbt(args=['run'])

# Execute long-running queries in threads
t1 = self.async_select('view_model', 10)
t2 = self.async_select('model_1', 5)

# While the queries are executing, re-run the project
res = self.run_dbt(args=['run', '--threads', '8'])

# Finally, wait for these threads to finish
t1.join()
t2.join()

self.assertTrue(len(res) > 0)

# If the query succeeded, the global query_state should be 'good'
self.assertEqual(self.query_state['view_model'], 'good')
self.assertEqual(self.query_state['model_1'], 'good')

class TableTestConcurrentTransaction(BaseTestConcurrentTransaction):
@property
def models(self):
return "test/integration/032_concurrent_transaction_test/models-table"

@attr(type="redshift")
def test__redshift__concurrent_transaction_table(self):
self.reset()
self.run_test()

class ViewTestConcurrentTransaction(BaseTestConcurrentTransaction):
@property
def models(self):
return "test/integration/032_concurrent_transaction_test/models-view"

@attr(type="redshift")
def test__redshift__concurrent_transaction_view(self):
self.reset()
self.run_test()

class IncrementalTestConcurrentTransaction(BaseTestConcurrentTransaction):
@property
def models(self):
return "test/integration/032_concurrent_transaction_test/models-incremental"

@attr(type="redshift")
def test__redshift__concurrent_transaction_incremental(self):
self.reset()
self.run_test()
24 changes: 24 additions & 0 deletions test/integration/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,28 @@ def postgres_profile(self):
}
}

def redshift_profile(self):
return {
'config': {
'send_anonymous_usage_stats': False
},
'test': {
'outputs': {
'default2': {
'type': 'redshift',
'threads': 1,
'host': os.getenv('REDSHIFT_TEST_HOST'),
'port': os.getenv('REDSHIFT_TEST_PORT'),
'user': os.getenv('REDSHIFT_TEST_USER'),
'pass': os.getenv('REDSHIFT_TEST_PASS'),
'dbname': os.getenv('REDSHIFT_TEST_DBNAME'),
'schema': self.unique_schema()
}
},
'target': 'default2'
}
}

def snowflake_profile(self):
return {
'config': {
Expand Down Expand Up @@ -137,6 +159,8 @@ def get_profile(self, adapter_type):
return self.snowflake_profile()
elif adapter_type == 'bigquery':
return self.bigquery_profile()
elif adapter_type == 'redshift':
return self.redshift_profile()

def setUp(self):
self.adapter_type = 'postgres'
Expand Down
Loading