Skip to content

Commit

Permalink
Merge pull request #1792 from fishtown-analytics/fix/run-operation-fl…
Browse files Browse the repository at this point in the history
…at-graph

Make run-operation a ManifestTask
  • Loading branch information
beckjake authored Sep 27, 2019
2 parents aea03eb + 6fdac50 commit c211d0e
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 5 deletions.
1 change: 1 addition & 0 deletions core/dbt/task/rpc_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def __init__(self, args, config, tasks=None):
super(RPCServerTask, self).__init__(args, config)
# compile locally
self.manifest = self._compile_manifest()
self.manifest.build_flat_graph()
self.task_manager = rpc.TaskManager()
tasks = tasks or [RemoteCompileTask, RemoteRunTask]
for cls in tasks:
Expand Down
13 changes: 8 additions & 5 deletions core/dbt/task/run_operation.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.task.base import ConfiguredTask
from dbt.task.runnable import ManifestTask
from dbt.adapters.factory import get_adapter
from dbt.loader import GraphLoader

import dbt
import dbt.utils
import dbt.exceptions


class RunOperationTask(ConfiguredTask):
class RunOperationTask(ManifestTask):
def _get_macro_parts(self):
macro_name = self.args.macro
if '.' in macro_name:
Expand All @@ -21,8 +20,11 @@ def _get_macro_parts(self):
def _get_kwargs(self):
return dbt.utils.parse_cli_vars(self.args.args)

def compile_manifest(self):
# skip building a linker, but do make sure to build the flat graph
self.manifest.build_flat_graph()

def _run_unsafe(self):
manifest = GraphLoader.load_all(self.config)
adapter = get_adapter(self.config)

package_name, macro_name = self._get_macro_parts()
Expand All @@ -34,12 +36,13 @@ def _run_unsafe(self):
macro_name,
project=package_name,
kwargs=macro_kwargs,
manifest=manifest
manifest=self.manifest
)

return res

def run(self):
self._runtime_initialize()
try:
result = self._run_unsafe()
except dbt.exceptions.Exception as exc:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,10 @@
{% endset %}
{% do run_query(query) %}
{% endmacro %}


{% macro log_graph() %}
{% for node in graph.nodes.values() %}
{{ log((node | string), info=True)}}
{% endfor %}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,7 @@ def test__postgres_vacuum_ref(self):
@use_profile('postgres')
def test__postgres_select(self):
self.run_operation('select_something', name='world')

@use_profile('postgres')
def test__postgres_access_graph(self):
self.run_operation('log_graph')

0 comments on commit c211d0e

Please sign in to comment.