Skip to content

Commit

Permalink
Merge pull request #1053 from fishtown-analytics/fix/ephemeral-compil…
Browse files Browse the repository at this point in the history
…e-errors

On ephemeral compile errors that lead to skips, generate real errors (#1037)
  • Loading branch information
beckjake authored Oct 15, 2018
2 parents 618dee0 + fb0da45 commit dd25750
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 12 deletions.
34 changes: 29 additions & 5 deletions dbt/node_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def __init__(self, config, adapter, node, node_index, num_nodes):
self.num_nodes = num_nodes

self.skip = False
self.skip_cause = None

def raise_on_first_error(self):
return False
Expand Down Expand Up @@ -170,15 +171,38 @@ def on_skip(self):
schema_name = self.node.schema
node_name = self.node.name

error = None
if not self.is_ephemeral_model(self.node):
dbt.ui.printer.print_skip_line(self.node, schema_name, node_name,
self.node_index, self.num_nodes)

node_result = RunModelResult(self.node, skip=True)
if self.skip_cause is None:
dbt.ui.printer.print_skip_line(
self.node,
schema_name,
node_name,
self.node_index,
self.num_nodes
)
else:
dbt.ui.printer.print_skip_caused_by_error(
self.node,
schema_name,
node_name,
self.node_index,
self.num_nodes,
self.skip_cause
)
# set an error so dbt will exit with an error code
error = (
'Compilation Error in referenced ephemeral model {}.{}'
.format(self.skip_cause.node.schema,
self.skip_cause.node.name)
)

node_result = RunModelResult(self.node, skip=True, error=error)
return node_result

def do_skip(self):
def do_skip(self, cause=None):
self.skip = True
self.skip_cause = cause

@classmethod
def get_model_schemas(cls, manifest):
Expand Down
22 changes: 17 additions & 5 deletions dbt/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,18 +122,18 @@ def execute_nodes(self, linker, Runner, manifest, node_dependency_list):

try:
for result in pool.imap_unordered(self.call_runner, args_list):
if not Runner.is_ephemeral_model(result.node):
is_ephemeral = Runner.is_ephemeral_model(result.node)
if not is_ephemeral:
node_results.append(result)

node = CompileResultNode(**result.node)
node_id = node.unique_id
manifest.nodes[node_id] = node

if result.errored:
for dep_node_id in self.get_dependent(linker, node_id):
runner = node_runners.get(dep_node_id)
if runner:
runner.do_skip()
dependents = self.get_dependent(linker, node_id)
self._mark_dependent_errors(node_runners, dependents,
result, is_ephemeral)

except KeyboardInterrupt:
pool.close()
Expand Down Expand Up @@ -164,6 +164,18 @@ def execute_nodes(self, linker, Runner, manifest, node_dependency_list):

return node_results

@staticmethod
def _mark_dependent_errors(node_runners, dependents, result, is_ephemeral):
for dep_node_id in dependents:
runner = node_runners.get(dep_node_id)
if not runner:
continue
if is_ephemeral:
cause = result
else:
cause = None
runner.do_skip(cause=result)

def write_results(self, execution_result):
filepath = os.path.join(self.config.target_path, RESULT_FILE_NAME)
write_json(filepath, execution_result.serialize())
Expand Down
13 changes: 11 additions & 2 deletions dbt/ui/printer.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,9 @@ def print_run_status_line(results):
logger.info(stats_line.format(**stats))


def print_run_result_error(result):
logger.info("")
def print_run_result_error(result, newline=True):
if newline:
logger.info("")

if result.failed:
logger.info(yellow("Failure in {} {} ({})").format(
Expand All @@ -245,6 +246,14 @@ def print_run_result_error(result):
logger.info(line)


def print_skip_caused_by_error(model, schema, relation, index, num_models,
result):
msg = ('SKIP relation {}.{} due to ephemeral model error'
.format(schema, relation))
print_fancy_output_line(msg, red('ERROR SKIP'), index, num_models)
print_run_result_error(result, newline=False)


def print_end_of_run_summary(num_errors, early_exit=False):
if early_exit:
message = yellow('Exited because of keyboard interrupt.')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{{ config(materialized='ephemeral') }}

select * from {{ this.schema }}.seed
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{{ config(materialized='ephemeral') }}

{{ adapter.invalid_method() }}

select * from {{ ref('base') }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- base copy is an error
select * from {{ref('base_copy')}} where gender = 'Male'
17 changes: 17 additions & 0 deletions test/integration/020_ephemeral_test/test_ephemeral.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,20 @@ def test__snowflake(self):
self.assertManyTablesEqual(
["SEED", "DEPENDENT", "DOUBLE_DEPENDENT", "SUPER_DEPENDENT"]
)

class TestEphemeralErrorHandling(DBTIntegrationTest):
@property
def schema(self):
return "ephemeral_020"

@property
def models(self):
return "test/integration/020_ephemeral_test/ephemeral-errors"

@attr(type='postgres')
def test__postgres_upstream_error(self):
self.run_sql_file("test/integration/020_ephemeral_test/seed.sql")

results = self.run_dbt(expect_pass=False)
self.assertEqual(len(results), 1)
self.assertTrue(results[0].errored)

0 comments on commit dd25750

Please sign in to comment.