Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abstract manifest generation from tasks #6565

Merged
merged 18 commits into from
Jan 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20230110-115725.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Abstract manifest generation
time: 2023-01-10T11:57:25.193965-06:00
custom:
Author: stu-k
Issue: "6357"
97 changes: 73 additions & 24 deletions core/dbt/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import click
from dbt.cli import requires, params as p
from dbt.config import RuntimeConfig
from dbt.config.project import Project
from dbt.config.profile import Profile
from dbt.contracts.graph.manifest import Manifest
Expand Down Expand Up @@ -120,10 +119,15 @@ def cli(ctx, **kwargs):
@requires.preflight
@requires.profile
@requires.project
@requires.runtime_config
@requires.manifest
def build(ctx, **kwargs):
stu-k marked this conversation as resolved.
Show resolved Hide resolved
"""Run all Seeds, Models, Snapshots, and tests in DAG order"""
config = RuntimeConfig.from_parts(ctx.obj["project"], ctx.obj["profile"], ctx.obj["flags"])
task = BuildTask(ctx.obj["flags"], config)
task = BuildTask(
ctx.obj["flags"],
ctx.obj["runtime_config"],
ctx.obj["manifest"],
)

results = task.run()
success = task.interpret_results(results)
Expand Down Expand Up @@ -166,7 +170,6 @@ def docs(ctx, **kwargs):
@p.models
@p.profile
@p.profiles_dir
@p.project_dir
@p.select
@p.selector
@p.state
Expand All @@ -178,10 +181,11 @@ def docs(ctx, **kwargs):
@requires.preflight
@requires.profile
@requires.project
@requires.runtime_config
@requires.manifest
def docs_generate(ctx, **kwargs):
"""Generate the documentation website for your project"""
config = RuntimeConfig.from_parts(ctx.obj["project"], ctx.obj["profile"], ctx.obj["flags"])
task = GenerateTask(ctx.obj["flags"], config)
task = GenerateTask(ctx.obj["flags"], ctx.obj["runtime_config"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry that I'm catching this post-merge but - should GenerateTask be passed the manifest?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, it is inheriting CompileTask, the reason it need manifest is that generate task would go to the warehouse and fetch all info about the models in current dbt project.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! docs generate also performs a full compile of all manifest nodes — unless --select is passed, in which case only those nodes; or if --no-compile is passed, in which case it doesn't

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the issue is being addressed by @aranke in the cli test work here.


results = task.run()
success = task.interpret_results(results)
Expand Down Expand Up @@ -225,11 +229,18 @@ def docs_serve(ctx, **kwargs):
@p.vars
@p.version_check
@requires.preflight
@requires.profile
@requires.project
@requires.runtime_config
@requires.manifest
def compile(ctx, **kwargs):
"""Generates executable SQL from source, model, test, and analysis files. Compiled SQL files are written to the
target/ directory."""
config = RuntimeConfig.from_parts(ctx.obj["project"], ctx.obj["profile"], ctx.obj["flags"])
task = CompileTask(ctx.obj["flags"], config)
task = CompileTask(
ctx.obj["flags"],
ctx.obj["runtime_config"],
ctx.obj["manifest"],
)

results = task.run()
success = task.interpret_results(results)
Expand Down Expand Up @@ -267,7 +278,6 @@ def debug(ctx, **kwargs):
def deps(ctx, **kwargs):
"""Pull the most recent version of the dependencies listed in packages.yml"""
task = DepsTask(ctx.obj["flags"], ctx.obj["project"])

results = task.run()
success = task.interpret_results(results)
return results, success
Expand Down Expand Up @@ -311,10 +321,15 @@ def init(ctx, **kwargs):
@requires.preflight
@requires.profile
@requires.project
@requires.runtime_config
@requires.manifest
def list(ctx, **kwargs):
"""List the resources in your project"""
config = RuntimeConfig.from_parts(ctx.obj["project"], ctx.obj["profile"], ctx.obj["flags"])
task = ListTask(ctx.obj["flags"], config)
task = ListTask(
ctx.obj["flags"],
ctx.obj["runtime_config"],
ctx.obj["manifest"],
)

results = task.run()
success = task.interpret_results(results)
Expand All @@ -341,9 +356,13 @@ def list(ctx, **kwargs):
@p.version_check
@p.write_manifest
@requires.preflight
@requires.profile
@requires.project
@requires.runtime_config
@requires.manifest(write_perf_info=True)
def parse(ctx, **kwargs):
"""Parses the project and provides information on performance"""
click.echo(f"`{inspect.stack()[0][3]}` called\n flags: {ctx.obj['flags']}")
# manifest generation and writing happens in @requires.manifest
return None, True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes in this PR mean that:

  • dbt parse will always write the manifest (= overwrite manifest.json). IMO that's a good change! I'd opened an issue for it recently: [CT-1759] dbt parse should (over)write manifest.json by default #6534. So we can also remove --write-manifest as a parameter for this command, and in params.py.
  • We no longer have the option of dbt parse --compile, which allows us to output detailed performance timing that includes the DAG construction step (= resolving ephemeral model references + linking nodes/edges).

For the second point: I'd be happy with kicking that out of scope for this PR, and opening a tech debt ticket to track it. There's a bigger idea here: "Abstract manifest compilation / graph generation from tasks." Idea being, move compile_manifest() out from task initialization, into either a conditional step within @requires.manifest, or as its own @requires.graph step. Then, tasks would accept the graph as an argument.

Considerations there:

  • Manifest compilation / graph generation does require the adapter, and therefore RuntimeConfig, as an input
  • It mutates the manifest and returns a Graph
  • The build task produces + uses a different graph from all other tasks, with extra test edges
  • If we stored the graph on ctx.obj, and passed it into each task explicitly, we would unlock the ability for programmatic invocations to cache + reuse the graph between invocations. That could make a difference for performance in very large projects. (With the important caveat that the external application would be responsible for distinguishing between standard and build-specific graphs.)

Finally, a separate proposal, out of scope for this PR but highly relevant: The parse task should return the Manifest here, instead of None (#6547). That could be as simple as changing this last line to:

def parse(ctx, **kwargs) -> Tuple[Manifest, bool]:
    """Parses the project and provides information on performance"""
    # manifest generation and writing happens in @requires.manifest
    return ctx.obj["manifest"], True

And then:

from dbt.cli.main import dbtRunner
dbt = dbtRunner()
manifest, _ = dbt.invoke(['parse'])

There's a bit more refinement we should do on that proposal first, to make sure it's an idea we're happy with.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the context. I had spoken with Gerda about the function of the existing ParseTask class, and she had said I could take out the tracking. Adding it back in shouldn't be difficult, and could be done conditionally.

For your proposal of returning the manifest from the task directly, I think that's something we need to discuss on the exec team. I believe we should have a standard output for each of these functions, perhaps a dict with a result or message or whatever key should contain result, in case we want to add meta information to that result object later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had spoken with Gerda about the function of the existing ParseTask class, and she had said I could take out the tracking. Adding it back in shouldn't be difficult, and could be done conditionally.

This is fine with me too. Gerda consolidated all the events in this task, anyway, to just ParseCmdOut (in the main branch). We'll just want to remove that event now that it's no longer being called anywhere. For simplicity, let's track that in a separate issue, rather than try to do branch merging shenanigans here.


The point I was making above was around, dbt parse --compile would also conditionally include the step around manifest compilation / graph generation, which can be very slow (as we've seen in recent reports/issues from folks with large projects). Let's track that in the new issue, "Abstract manifest compilation / graph generation from tasks."


For your proposal of returning the manifest from the task directly, I think that's something we need to discuss on the exec team. I believe we should have a standard output for each of these functions, perhaps a dict with a result or message or whatever key should contain result, in case we want to add meta information to that result object later.

Agreed! Let's keep discussing in the separate linked issue. The biggest considerations are:

  • How to keep the results relatively consistent for all commands (although different tasks already return differently typed result objects)
  • Should we expose the full manifest, as part of our public API? Just a part of it? Should we call this "experimental" functionality (liable to change in future versions)? We have the power to decide these things!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to call these changes out here:
https://docs.getdbt.com/guides/migration/versions/upgrading-to-v1.5

Did the proposed "Abstract manifest compilation / graph generation from tasks." issue ever get created?

In the meantime, I'm going to delete both of these from params.py as part of #6546 since they don't appear to do anything:

compile_parse = click.option(
"--compile/--no-compile",
envvar=None,
help="TODO: No help text currently available",
default=True,
)

write_manifest = click.option(
"--write-manifest/--no-write-manifest",
envvar=None,
help="TODO: No help text currently available",
default=True,
)



Expand All @@ -369,10 +388,15 @@ def parse(ctx, **kwargs):
@requires.preflight
@requires.profile
@requires.project
@requires.runtime_config
@requires.manifest
def run(ctx, **kwargs):
"""Compile SQL and execute against the current target database."""
config = RuntimeConfig.from_parts(ctx.obj["project"], ctx.obj["profile"], ctx.obj["flags"])
task = RunTask(ctx.obj["flags"], config)
task = RunTask(
ctx.obj["flags"],
ctx.obj["runtime_config"],
ctx.obj["manifest"],
)

results = task.run()
success = task.interpret_results(results)
Expand All @@ -392,10 +416,15 @@ def run(ctx, **kwargs):
@requires.preflight
@requires.profile
@requires.project
@requires.runtime_config
@requires.manifest
def run_operation(ctx, **kwargs):
"""Run the named macro with any supplied arguments."""
config = RuntimeConfig.from_parts(ctx.obj["project"], ctx.obj["profile"], ctx.obj["flags"])
task = RunOperationTask(ctx.obj["flags"], config)
task = RunOperationTask(
ctx.obj["flags"],
ctx.obj["runtime_config"],
ctx.obj["manifest"],
)

results = task.run()
success = task.interpret_results(results)
Expand Down Expand Up @@ -423,10 +452,15 @@ def run_operation(ctx, **kwargs):
@requires.preflight
@requires.profile
@requires.project
@requires.runtime_config
@requires.manifest
def seed(ctx, **kwargs):
"""Load data from csv files into your data warehouse."""
config = RuntimeConfig.from_parts(ctx.obj["project"], ctx.obj["profile"], ctx.obj["flags"])
task = SeedTask(ctx.obj["flags"], config)
task = SeedTask(
ctx.obj["flags"],
ctx.obj["runtime_config"],
ctx.obj["manifest"],
)

results = task.run()
success = task.interpret_results(results)
Expand All @@ -451,10 +485,15 @@ def seed(ctx, **kwargs):
@requires.preflight
@requires.profile
@requires.project
@requires.runtime_config
@requires.manifest
def snapshot(ctx, **kwargs):
"""Execute snapshots defined in your project"""
config = RuntimeConfig.from_parts(ctx.obj["project"], ctx.obj["profile"], ctx.obj["flags"])
task = SnapshotTask(ctx.obj["flags"], config)
task = SnapshotTask(
ctx.obj["flags"],
ctx.obj["runtime_config"],
ctx.obj["manifest"],
)

results = task.run()
success = task.interpret_results(results)
Expand Down Expand Up @@ -486,10 +525,15 @@ def source(ctx, **kwargs):
@requires.preflight
@requires.profile
@requires.project
@requires.runtime_config
@requires.manifest
def freshness(ctx, **kwargs):
"""check the current freshness of the project's sources"""
config = RuntimeConfig.from_parts(ctx.obj["project"], ctx.obj["profile"], ctx.obj["flags"])
task = FreshnessTask(ctx.obj["flags"], config)
task = FreshnessTask(
ctx.obj["flags"],
ctx.obj["runtime_config"],
ctx.obj["manifest"],
)

results = task.run()
success = task.interpret_results(results)
Expand Down Expand Up @@ -525,10 +569,15 @@ def freshness(ctx, **kwargs):
@requires.preflight
@requires.profile
@requires.project
@requires.runtime_config
@requires.manifest
def test(ctx, **kwargs):
"""Runs tests on data in deployed models. Run this after `dbt run`"""
config = RuntimeConfig.from_parts(ctx.obj["project"], ctx.obj["profile"], ctx.obj["flags"])
task = TestTask(ctx.obj["flags"], config)
task = TestTask(
ctx.obj["flags"],
ctx.obj["runtime_config"],
ctx.obj["manifest"],
)

results = task.run()
success = task.interpret_results(results)
Expand Down
71 changes: 70 additions & 1 deletion core/dbt/cli/requires.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from dbt.adapters.factory import adapter_management
from dbt.adapters.factory import adapter_management, register_adapter
from dbt.cli.flags import Flags
from dbt.config import RuntimeConfig
from dbt.config.runtime import load_project, load_profile
from dbt.events.functions import setup_event_logger
from dbt.exceptions import DbtProjectError
from dbt.parser.manifest import ManifestLoader, write_manifest
from dbt.profiler import profiler
from dbt.tracking import initialize_from_flags, track_run

Expand Down Expand Up @@ -85,3 +87,70 @@ def wrapper(*args, **kwargs):
return func(*args, **kwargs)

return update_wrapper(wrapper, func)


def runtime_config(func):
"""A decorator used by click command functions for generating a runtime
config given a profile and project.
"""

def wrapper(*args, **kwargs):
ctx = args[0]
assert isinstance(ctx, Context)

req_strs = ["profile", "project"]
reqs = [ctx.obj.get(req_str) for req_str in req_strs]

if None in reqs:
raise DbtProjectError("profile and project required for runtime_config")

ctx.obj["runtime_config"] = RuntimeConfig.from_parts(
ctx.obj["project"],
ctx.obj["profile"],
ctx.obj["flags"],
)

return func(*args, **kwargs)

return update_wrapper(wrapper, func)


def manifest(*args0, write_perf_info=False):
stu-k marked this conversation as resolved.
Show resolved Hide resolved
"""A decorator used by click command functions for generating a manifest
given a profile, project, and runtime config. This also registers the adaper
from the runtime config and writes the manifest to disc.
"""

def outer_wrapper(func):
def wrapper(*args, **kwargs):
ctx = args[0]
assert isinstance(ctx, Context)

req_strs = ["profile", "project", "runtime_config"]
reqs = [ctx.obj.get(dep) for dep in req_strs]

if None in reqs:
raise DbtProjectError("profile, project, and runtime_config required for manifest")

runtime_config = ctx.obj["runtime_config"]
register_adapter(runtime_config)

# a manifest has already been set on the context, so don't overwrite it
if ctx.obj.get("manifest") is None:
manifest = ManifestLoader.get_full_manifest(
runtime_config, write_perf_info=write_perf_info
)

ctx.obj["manifest"] = manifest
if ctx.obj["flags"].write_json:
write_manifest(manifest, ctx.obj["runtime_config"].target_path)
stu-k marked this conversation as resolved.
Show resolved Hide resolved

return func(*args, **kwargs)

return update_wrapper(wrapper, func)

# if there are no args, the decorator was used without params @decorator
# otherwise, the decorator was called with params @decorator(arg)
if len(args0) == 0:
return outer_wrapper
return outer_wrapper(args0[0])
Comment on lines +152 to +156
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a better more pythonic way to have a decorator accept args?

Binary file modified core/dbt/docs/build/doctrees/environment.pickle
Binary file not shown.
Binary file modified core/dbt/docs/build/doctrees/index.doctree
Binary file not shown.
1 change: 1 addition & 0 deletions core/dbt/docs/build/html/_sources/index.rst.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ How to invoke dbt commands in python runtime
Right now the best way to invoke a command from python runtime is to use the `dbtRunner` we exposed

.. code-block:: python

from dbt.cli.main import dbtRunner
cli_args = ['run', '--project-dir', 'jaffle_shop']

Expand Down
22 changes: 22 additions & 0 deletions core/dbt/docs/build/html/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ <h1>dbt-core’s API documentation<a class="headerlink" href="#dbt-core-s-api-do
<section id="how-to-invoke-dbt-commands-in-python-runtime">
<h2>How to invoke dbt commands in python runtime<a class="headerlink" href="#how-to-invoke-dbt-commands-in-python-runtime" title="Permalink to this heading">¶</a></h2>
<p>Right now the best way to invoke a command from python runtime is to use the <cite>dbtRunner</cite> we exposed</p>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">dbt.cli.main</span> <span class="kn">import</span> <span class="n">dbtRunner</span>
<span class="n">cli_args</span> <span class="o">=</span> <span class="p">[</span><span class="s1">&#39;run&#39;</span><span class="p">,</span> <span class="s1">&#39;--project-dir&#39;</span><span class="p">,</span> <span class="s1">&#39;jaffle_shop&#39;</span><span class="p">]</span>

<span class="c1"># initialize the dbt runner</span>
<span class="n">dbt</span> <span class="o">=</span> <span class="n">dbtRunner</span><span class="p">()</span>
<span class="c1"># run the command</span>
<span class="n">res</span><span class="p">,</span> <span class="n">success</span> <span class="o">=</span> <span class="n">dbt</span><span class="o">.</span><span class="n">invoke</span><span class="p">(</span><span class="n">args</span><span class="p">)</span>
</pre></div>
</div>
<p>You can also pass in pre constructed object into dbtRunner, and we will use those objects instead of loading up from the disk.</p>
<div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="c1"># preload profile and project</span>
<span class="n">profile</span> <span class="o">=</span> <span class="n">load_profile</span><span class="p">(</span><span class="n">project_dir</span><span class="p">,</span> <span class="p">{},</span> <span class="s1">&#39;testing-postgres&#39;</span><span class="p">)</span>
Expand Down Expand Up @@ -92,6 +101,11 @@ <h4>project_dir<a class="headerlink" href="#build|project_dir" title="Permalink
<p>Type: path</p>
<p>Which directory to look in for the dbt_project.yml file. Default is the current working directory and its parents.</p>
</section>
<section id="build|resource_types">
<h4>resource_types<a class="headerlink" href="#build|resource_types" title="Permalink to this heading">¶</a></h4>
<p>Type: unknown</p>
<p>TODO: No current help text</p>
</section>
<section id="build|select">
<h4>select<a class="headerlink" href="#build|select" title="Permalink to this heading">¶</a></h4>
<p>Type: unknown</p>
Expand Down Expand Up @@ -313,6 +327,10 @@ <h4>vars<a class="headerlink" href="#deps|vars" title="Permalink to this heading
</section>
<h3>Command: docs<a class="headerlink" href="#dbt-section" title="Permalink to this heading">¶</a></h3>
<h3>Command: init<a class="headerlink" href="#dbt-section" title="Permalink to this heading">¶</a></h3>
<section id="init|project_name">
<h4>project_name<a class="headerlink" href="#init|project_name" title="Permalink to this heading">¶</a></h4>
<p>Type: string</p>
</section>
<section id="init|profile">
<h4>profile<a class="headerlink" href="#init|profile" title="Permalink to this heading">¶</a></h4>
<p>Type: string</p>
Expand Down Expand Up @@ -618,6 +636,10 @@ <h4>version_check<a class="headerlink" href="#run|version_check" title="Permalin
<p>Ensure dbt’s version matches the one specified in the dbt_project.yml file (‘require-dbt-version’)</p>
</section>
<h3>Command: run_operation<a class="headerlink" href="#dbt-section" title="Permalink to this heading">¶</a></h3>
<section id="run-operation|macro">
<h4>macro<a class="headerlink" href="#run-operation|macro" title="Permalink to this heading">¶</a></h4>
<p>Type: string</p>
</section>
<section id="run-operation|args">
<h4>args<a class="headerlink" href="#run-operation|args" title="Permalink to this heading">¶</a></h4>
<p>Type: YAML</p>
Expand Down
Loading