From 65e29ed5448c9991d9afd00490e6aff471f5c927 Mon Sep 17 00:00:00 2001 From: Stu Kilgore Date: Wed, 5 Apr 2023 11:43:06 -0500 Subject: [PATCH 01/10] Add option to skip relation cache population --- .../unreleased/Features-20230410-115538.yaml | 6 ++++ core/dbt/cli/main.py | 17 +++++++++++ core/dbt/cli/params.py | 7 +++++ core/dbt/contracts/project.py | 29 ++++++++++--------- core/dbt/task/runnable.py | 3 ++ 5 files changed, 48 insertions(+), 14 deletions(-) create mode 100644 .changes/unreleased/Features-20230410-115538.yaml diff --git a/.changes/unreleased/Features-20230410-115538.yaml b/.changes/unreleased/Features-20230410-115538.yaml new file mode 100644 index 00000000000..9f6f548c672 --- /dev/null +++ b/.changes/unreleased/Features-20230410-115538.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Add --no-populate-cache to optionally skip relation cache population +time: 2023-04-10T11:55:38.360997-05:00 +custom: + Author: stu-k + Issue: "1751" diff --git a/core/dbt/cli/main.py b/core/dbt/cli/main.py index dddaf4a2194..76622a068bf 100644 --- a/core/dbt/cli/main.py +++ b/core/dbt/cli/main.py @@ -179,6 +179,7 @@ def cli(ctx, **kwargs): @p.deprecated_favor_state @p.full_refresh @p.indirect_selection +@p.populate_cache @p.profile @p.profiles_dir @p.project_dir @@ -216,6 +217,7 @@ def build(ctx, **kwargs): # dbt clean @cli.command("clean") @click.pass_context +@p.populate_cache @p.profile @p.profiles_dir @p.project_dir @@ -250,6 +252,7 @@ def docs(ctx, **kwargs): @p.exclude @p.favor_state @p.deprecated_favor_state +@p.populate_cache @p.profile @p.profiles_dir @p.project_dir @@ -285,6 +288,7 @@ def docs_generate(ctx, **kwargs): @docs.command("serve") @click.pass_context @p.browser +@p.populate_cache @p.port @p.profile @p.profiles_dir @@ -323,6 +327,7 @@ def docs_serve(ctx, **kwargs): @p.indirect_selection @p.introspect @p.parse_only +@p.populate_cache @p.profile @p.profiles_dir @p.project_dir @@ -370,6 +375,7 @@ def compile(ctx, **kwargs): @p.indirect_selection @p.introspect @p.parse_only +@p.populate_cache @p.profile @p.profiles_dir @p.project_dir @@ -407,6 +413,7 @@ def show(ctx, **kwargs): @cli.command("debug") @click.pass_context @p.config_dir +@p.populate_cache @p.profile @p.profiles_dir_exists_false @p.project_dir @@ -430,6 +437,7 @@ def debug(ctx, **kwargs): # dbt deps @cli.command("deps") @click.pass_context +@p.populate_cache @p.profile @p.profiles_dir @p.project_dir @@ -452,6 +460,7 @@ def deps(ctx, **kwargs): @click.pass_context # for backwards compatibility, accept 'project_name' as an optional positional argument @click.argument("project_name", required=False) +@p.populate_cache @p.profile @p.profiles_dir @p.project_dir @@ -477,6 +486,7 @@ def init(ctx, **kwargs): @p.models @p.output @p.output_keys +@p.populate_cache @p.profile @p.profiles_dir @p.project_dir @@ -516,6 +526,7 @@ def list(ctx, **kwargs): @cli.command("parse") @click.pass_context @p.compile_parse +@p.populate_cache @p.profile @p.profiles_dir @p.project_dir @@ -548,6 +559,7 @@ def parse(ctx, **kwargs): @p.exclude @p.fail_fast @p.full_refresh +@p.populate_cache @p.profile @p.profiles_dir @p.project_dir @@ -584,6 +596,7 @@ def run(ctx, **kwargs): @click.pass_context @click.argument("macro") @p.args +@p.populate_cache @p.profile @p.profiles_dir @p.project_dir @@ -613,6 +626,7 @@ def run_operation(ctx, **kwargs): @click.pass_context @p.exclude @p.full_refresh +@p.populate_cache @p.profile @p.profiles_dir @p.project_dir @@ -652,6 +666,7 @@ def seed(ctx, **kwargs): @p.exclude @p.favor_state @p.deprecated_favor_state +@p.populate_cache @p.profile @p.profiles_dir @p.project_dir @@ -693,6 +708,7 @@ def source(ctx, **kwargs): @click.pass_context @p.exclude @p.output_path # TODO: Is this ok to re-use? We have three different output params, how much can we consolidate? +@p.populate_cache @p.profile @p.profiles_dir @p.project_dir @@ -738,6 +754,7 @@ def freshness(ctx, **kwargs): @p.favor_state @p.deprecated_favor_state @p.indirect_selection +@p.populate_cache @p.profile @p.profiles_dir @p.project_dir diff --git a/core/dbt/cli/params.py b/core/dbt/cli/params.py index 661c982f694..870da403cb7 100644 --- a/core/dbt/cli/params.py +++ b/core/dbt/cli/params.py @@ -238,6 +238,13 @@ default=True, ) +populate_cache = click.option( + "--populate-cache/--no-populate-cache", + envvar="DBT_POPULATE_CACHE", + help="Allow for partial parsing by looking for and writing to a pickle file in the target directory. This overrides the user configuration file.", + default=True, +) + port = click.option( "--port", envvar=None, diff --git a/core/dbt/contracts/project.py b/core/dbt/contracts/project.py index 18c6a0ba758..581932e5888 100644 --- a/core/dbt/contracts/project.py +++ b/core/dbt/contracts/project.py @@ -243,25 +243,26 @@ def validate(cls, data): @dataclass class UserConfig(ExtensibleDbtClassMixin, Replaceable, UserConfigContract): - send_anonymous_usage_stats: bool = DEFAULT_SEND_ANONYMOUS_USAGE_STATS - use_colors: Optional[bool] = None - use_colors_file: Optional[bool] = None - partial_parse: Optional[bool] = None - printer_width: Optional[int] = None - write_json: Optional[bool] = None - warn_error: Optional[bool] = None - warn_error_options: Optional[Dict[str, Union[str, List[str]]]] = None + cache_selected_only: Optional[bool] = None + debug: Optional[bool] = None + fail_fast: Optional[bool] = None + indirect_selection: Optional[str] = None log_format: Optional[str] = None log_format_file: Optional[str] = None log_level: Optional[str] = None log_level_file: Optional[str] = None - debug: Optional[bool] = None - version_check: Optional[bool] = None - fail_fast: Optional[bool] = None - use_experimental_parser: Optional[bool] = None + partial_parse: Optional[bool] = None + populate_cache: Optional[bool] = None + printer_width: Optional[int] = None + send_anonymous_usage_stats: bool = DEFAULT_SEND_ANONYMOUS_USAGE_STATS static_parser: Optional[bool] = None - indirect_selection: Optional[str] = None - cache_selected_only: Optional[bool] = None + use_colors: Optional[bool] = None + use_colors_file: Optional[bool] = None + use_experimental_parser: Optional[bool] = None + version_check: Optional[bool] = None + warn_error: Optional[bool] = None + warn_error_options: Optional[Dict[str, Union[str, List[str]]]] = None + write_json: Optional[bool] = None @dataclass diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index c4bd99e9960..a6ba3d08ef4 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -371,6 +371,9 @@ def _mark_dependent_errors(self, node_id, result, cause): self._skipped_children[dep_node_id] = cause def populate_adapter_cache(self, adapter, required_schemas: Set[BaseRelation] = None): + if not self.args.populate_cache: + return + start_populate_cache = time.perf_counter() if get_flags().CACHE_SELECTED_ONLY is True: adapter.set_relations_cache(self.manifest, required_schemas=required_schemas) From a9356bd7046cd64dafd7e6cdd25bb4d6784a6d52 Mon Sep 17 00:00:00 2001 From: Stu Kilgore Date: Mon, 10 Apr 2023 16:05:59 -0500 Subject: [PATCH 02/10] Move populate cache option to cli group --- core/dbt/cli/main.py | 22 +++++----------------- 1 file changed, 5 insertions(+), 17 deletions(-) diff --git a/core/dbt/cli/main.py b/core/dbt/cli/main.py index 76622a068bf..5a15ad2764d 100644 --- a/core/dbt/cli/main.py +++ b/core/dbt/cli/main.py @@ -148,6 +148,11 @@ def invoke(self, args: List[str], **kwargs) -> dbtRunnerResult: @p.macro_debugging @p.partial_parse @p.print +<<<<<<< HEAD +======= +@p.deprecated_print +@p.populate_cache +>>>>>>> 362012793 (Move populate cache option to cli group) @p.printer_width @p.quiet @p.record_timing_info @@ -179,7 +184,6 @@ def cli(ctx, **kwargs): @p.deprecated_favor_state @p.full_refresh @p.indirect_selection -@p.populate_cache @p.profile @p.profiles_dir @p.project_dir @@ -217,7 +221,6 @@ def build(ctx, **kwargs): # dbt clean @cli.command("clean") @click.pass_context -@p.populate_cache @p.profile @p.profiles_dir @p.project_dir @@ -252,7 +255,6 @@ def docs(ctx, **kwargs): @p.exclude @p.favor_state @p.deprecated_favor_state -@p.populate_cache @p.profile @p.profiles_dir @p.project_dir @@ -288,7 +290,6 @@ def docs_generate(ctx, **kwargs): @docs.command("serve") @click.pass_context @p.browser -@p.populate_cache @p.port @p.profile @p.profiles_dir @@ -327,7 +328,6 @@ def docs_serve(ctx, **kwargs): @p.indirect_selection @p.introspect @p.parse_only -@p.populate_cache @p.profile @p.profiles_dir @p.project_dir @@ -375,7 +375,6 @@ def compile(ctx, **kwargs): @p.indirect_selection @p.introspect @p.parse_only -@p.populate_cache @p.profile @p.profiles_dir @p.project_dir @@ -413,7 +412,6 @@ def show(ctx, **kwargs): @cli.command("debug") @click.pass_context @p.config_dir -@p.populate_cache @p.profile @p.profiles_dir_exists_false @p.project_dir @@ -437,7 +435,6 @@ def debug(ctx, **kwargs): # dbt deps @cli.command("deps") @click.pass_context -@p.populate_cache @p.profile @p.profiles_dir @p.project_dir @@ -460,7 +457,6 @@ def deps(ctx, **kwargs): @click.pass_context # for backwards compatibility, accept 'project_name' as an optional positional argument @click.argument("project_name", required=False) -@p.populate_cache @p.profile @p.profiles_dir @p.project_dir @@ -486,7 +482,6 @@ def init(ctx, **kwargs): @p.models @p.output @p.output_keys -@p.populate_cache @p.profile @p.profiles_dir @p.project_dir @@ -526,7 +521,6 @@ def list(ctx, **kwargs): @cli.command("parse") @click.pass_context @p.compile_parse -@p.populate_cache @p.profile @p.profiles_dir @p.project_dir @@ -559,7 +553,6 @@ def parse(ctx, **kwargs): @p.exclude @p.fail_fast @p.full_refresh -@p.populate_cache @p.profile @p.profiles_dir @p.project_dir @@ -596,7 +589,6 @@ def run(ctx, **kwargs): @click.pass_context @click.argument("macro") @p.args -@p.populate_cache @p.profile @p.profiles_dir @p.project_dir @@ -626,7 +618,6 @@ def run_operation(ctx, **kwargs): @click.pass_context @p.exclude @p.full_refresh -@p.populate_cache @p.profile @p.profiles_dir @p.project_dir @@ -666,7 +657,6 @@ def seed(ctx, **kwargs): @p.exclude @p.favor_state @p.deprecated_favor_state -@p.populate_cache @p.profile @p.profiles_dir @p.project_dir @@ -708,7 +698,6 @@ def source(ctx, **kwargs): @click.pass_context @p.exclude @p.output_path # TODO: Is this ok to re-use? We have three different output params, how much can we consolidate? -@p.populate_cache @p.profile @p.profiles_dir @p.project_dir @@ -754,7 +743,6 @@ def freshness(ctx, **kwargs): @p.favor_state @p.deprecated_favor_state @p.indirect_selection -@p.populate_cache @p.profile @p.profiles_dir @p.project_dir From 150f9f27819bc0f8388fb7015302a64a91e0f73f Mon Sep 17 00:00:00 2001 From: Stu Kilgore Date: Mon, 10 Apr 2023 17:04:52 -0500 Subject: [PATCH 03/10] Add possible implementation for CT-1498 --- core/dbt/adapters/base/impl.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index 6bf86b90bde..014f89902ef 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -718,6 +718,24 @@ def list_relations(self, database: Optional[str], schema: str) -> List[BaseRelat # we can't build the relations cache because we don't have a # manifest so we can't run any operations. relations = self.list_relations_without_caching(schema_relation) + + # if the cache is already populated, add this schema in + # otherwise, skip updating the cache and just ignore + if self.cache: + for relation in relations: + self.cache.add(relation) + if not relations: + # it's possible that there were no relations in some schemas. We want + # to insert the schemas we query into the cache's `.schemas` attribute + # so we can check it later + + # Jeremy: what was the intent behind this inner loop? + # cache_update: Set[Tuple[Optional[str], Optional[str]]] = set() + # for relation in cache_schemas: + # cache_update.add((database, schema)) + + self.cache.update_schemas(set((database, schema))) + fire_event( ListRelations( database=cast_to_str(database), From e06eb0040ce503c96bf4b1bffd2feb54e5691362 Mon Sep 17 00:00:00 2001 From: Stu Kilgore Date: Tue, 11 Apr 2023 09:57:00 -0500 Subject: [PATCH 04/10] Use iterable --- core/dbt/adapters/base/impl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index 014f89902ef..03d8f4a2f8c 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -734,7 +734,7 @@ def list_relations(self, database: Optional[str], schema: str) -> List[BaseRelat # for relation in cache_schemas: # cache_update.add((database, schema)) - self.cache.update_schemas(set((database, schema))) + self.cache.update_schemas([(database, schema)]) fire_event( ListRelations( From 46bc735a507854514bde33d4dd69765be293e527 Mon Sep 17 00:00:00 2001 From: Stu Kilgore Date: Tue, 11 Apr 2023 10:15:18 -0500 Subject: [PATCH 05/10] Remove comment --- core/dbt/adapters/base/impl.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index 03d8f4a2f8c..aa35dce98ae 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -728,12 +728,6 @@ def list_relations(self, database: Optional[str], schema: str) -> List[BaseRelat # it's possible that there were no relations in some schemas. We want # to insert the schemas we query into the cache's `.schemas` attribute # so we can check it later - - # Jeremy: what was the intent behind this inner loop? - # cache_update: Set[Tuple[Optional[str], Optional[str]]] = set() - # for relation in cache_schemas: - # cache_update.add((database, schema)) - self.cache.update_schemas([(database, schema)]) fire_event( From eb5327255652b2046a9f5ff35b3c93e1745053d5 Mon Sep 17 00:00:00 2001 From: Stu Kilgore Date: Tue, 11 Apr 2023 11:42:52 -0500 Subject: [PATCH 06/10] Add test --- .../dbt/tests/adapter/caching/test_caching.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/tests/adapter/dbt/tests/adapter/caching/test_caching.py b/tests/adapter/dbt/tests/adapter/caching/test_caching.py index 9cf02309c4c..53fa74be9c4 100644 --- a/tests/adapter/dbt/tests/adapter/caching/test_caching.py +++ b/tests/adapter/dbt/tests/adapter/caching/test_caching.py @@ -91,6 +91,20 @@ def test_cache(self, project): self.run_and_inspect_cache(project, run_args) +class TestNoPopulateCache(BaseCachingTest): + @pytest.fixture(scope="class") + def models(self): + return { + "model.sql": model_sql, + } + + def test_cache(self, project): + # --no-populate-cache still allows the cache to populate all relations + # under a schema, so the behavior here remains the same as other tests + run_args = ["--no-populate-cache", "run"] + self.run_and_inspect_cache(project, run_args) + + class TestCachingLowerCaseModel(BaseCachingLowercaseModel): pass From 80ea55cec9510c56deeb7dbdae145f509fbc820b Mon Sep 17 00:00:00 2001 From: Stu Kilgore Date: Tue, 11 Apr 2023 11:56:14 -0500 Subject: [PATCH 07/10] Merge conflict --- core/dbt/cli/main.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/core/dbt/cli/main.py b/core/dbt/cli/main.py index 5a15ad2764d..78bad452f3a 100644 --- a/core/dbt/cli/main.py +++ b/core/dbt/cli/main.py @@ -148,11 +148,7 @@ def invoke(self, args: List[str], **kwargs) -> dbtRunnerResult: @p.macro_debugging @p.partial_parse @p.print -<<<<<<< HEAD -======= -@p.deprecated_print @p.populate_cache ->>>>>>> 362012793 (Move populate cache option to cli group) @p.printer_width @p.quiet @p.record_timing_info From 78b1a0d653df06e38aad912f5fb6a1ce320b6744 Mon Sep 17 00:00:00 2001 From: Stu Kilgore Date: Tue, 11 Apr 2023 11:57:39 -0500 Subject: [PATCH 08/10] Alphabetize --- core/dbt/cli/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/dbt/cli/main.py b/core/dbt/cli/main.py index 78bad452f3a..6cf21b9f31f 100644 --- a/core/dbt/cli/main.py +++ b/core/dbt/cli/main.py @@ -147,8 +147,8 @@ def invoke(self, args: List[str], **kwargs) -> dbtRunnerResult: @p.log_path @p.macro_debugging @p.partial_parse -@p.print @p.populate_cache +@p.print @p.printer_width @p.quiet @p.record_timing_info From 46d1781f0d475b65c8a547c024b8fdf21c6f5a42 Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Thu, 6 Apr 2023 15:14:41 -0700 Subject: [PATCH 09/10] add removal of node to make dbt-server compile query work --- core/dbt/lib.py | 138 +-------------------------------------- core/dbt/parser/sql.py | 19 +----- core/dbt/task/compile.py | 20 ++++++ test/unit/test_lib.py | 62 ------------------ 4 files changed, 23 insertions(+), 216 deletions(-) delete mode 100644 test/unit/test_lib.py diff --git a/core/dbt/lib.py b/core/dbt/lib.py index 89f6c2ce910..1dabf57a496 100644 --- a/core/dbt/lib.py +++ b/core/dbt/lib.py @@ -1,10 +1,5 @@ import os from dbt.config.project import Project -from dbt.contracts.results import RunningStatus, collect_timing_info -from dbt.events.functions import fire_event -from dbt.events.types import NodeCompiling, NodeExecuting -from dbt.exceptions import DbtRuntimeError -from dbt.task.sql import SqlCompileRunner from dataclasses import dataclass from dbt.cli.resolvers import default_profiles_dir from dbt.config.runtime import load_profile, load_project @@ -20,54 +15,6 @@ class RuntimeArgs: target: str -class SqlCompileRunnerNoIntrospection(SqlCompileRunner): - def compile_and_execute(self, manifest, ctx): - """ - This version of this method does not connect to the data warehouse. - As a result, introspective queries at compilation will not be supported - and will throw an error. - - TODO: This is a temporary solution to more complex permissions requirements - for the semantic layer, and thus largely duplicates the code in the parent class - method. Once conditional credential usage is enabled, this should be removed. - """ - result = None - ctx.node.update_event_status(node_status=RunningStatus.Compiling) - fire_event( - NodeCompiling( - node_info=ctx.node.node_info, - ) - ) - with collect_timing_info("compile") as timing_info: - # if we fail here, we still have a compiled node to return - # this has the benefit of showing a build path for the errant - # model - ctx.node = self.compile(manifest) - ctx.timing.append(timing_info) - - # for ephemeral nodes, we only want to compile, not run - if not ctx.node.is_ephemeral_model: - ctx.node.update_event_status(node_status=RunningStatus.Executing) - fire_event( - NodeExecuting( - node_info=ctx.node.node_info, - ) - ) - with collect_timing_info("execute") as timing_info: - result = self.run(ctx.node, manifest) - ctx.node = result.node - - ctx.timing.append(timing_info) - - return result - - -def load_profile_project(project_dir, profile_name_override=None): - profile = load_profile(project_dir, {}, profile_name_override) - project = load_project(project_dir, False, profile, {}) - return profile, project - - def get_dbt_config(project_dir, args=None, single_threaded=False): from dbt.config.runtime import RuntimeConfig import dbt.adapters.factory @@ -90,7 +37,8 @@ def get_dbt_config(project_dir, args=None, single_threaded=False): # set global flags from arguments set_from_args(runtime_args, None) - profile, project = load_profile_project(project_dir, profile_name) + profile = load_profile(project_dir, {}, profile_name) + project = load_project(project_dir, False, profile, {}) assert type(project) is Project config = RuntimeConfig.from_parts(project, profile, runtime_args) @@ -111,88 +59,6 @@ def get_dbt_config(project_dir, args=None, single_threaded=False): return config -def get_task_by_type(type): - from dbt.task.run import RunTask - from dbt.task.list import ListTask - from dbt.task.seed import SeedTask - from dbt.task.test import TestTask - from dbt.task.build import BuildTask - from dbt.task.snapshot import SnapshotTask - from dbt.task.run_operation import RunOperationTask - - if type == "run": - return RunTask - elif type == "test": - return TestTask - elif type == "list": - return ListTask - elif type == "seed": - return SeedTask - elif type == "build": - return BuildTask - elif type == "snapshot": - return SnapshotTask - elif type == "run_operation": - return RunOperationTask - - raise DbtRuntimeError("not a valid task") - - -def create_task(type, args, manifest, config): - task = get_task_by_type(type) - - def no_op(*args, **kwargs): - pass - - task = task(args, config) - task.load_manifest = no_op - task.manifest = manifest - return task - - -def _get_operation_node(manifest, project_path, sql, node_name): - from dbt.parser.manifest import process_node - from dbt.parser.sql import SqlBlockParser - import dbt.adapters.factory - - config = get_dbt_config(project_path) - block_parser = SqlBlockParser( - project=config, - manifest=manifest, - root_project=config, - ) - - adapter = dbt.adapters.factory.get_adapter(config) - sql_node = block_parser.parse_remote(sql, node_name) - process_node(config, manifest, sql_node) - return config, sql_node, adapter - - -def compile_sql(manifest, project_path, sql, node_name="query"): - config, node, adapter = _get_operation_node(manifest, project_path, sql, node_name) - allow_introspection = str(os.environ.get("__DBT_ALLOW_INTROSPECTION", "1")).lower() in ( - "true", - "1", - "on", - ) - - if allow_introspection: - runner = SqlCompileRunner(config, adapter, node, 1, 1) - else: - runner = SqlCompileRunnerNoIntrospection(config, adapter, node, 1, 1) - return runner.safe_run(manifest) - - -def execute_sql(manifest, project_path, sql, node_name="query"): - from dbt.task.sql import SqlExecuteRunner - - config, node, adapter = _get_operation_node(manifest, project_path, sql, node_name) - - runner = SqlExecuteRunner(config, adapter, node, 1, 1) - - return runner.safe_run(manifest) - - def parse_to_manifest(config): from dbt.parser.manifest import ManifestLoader diff --git a/core/dbt/parser/sql.py b/core/dbt/parser/sql.py index 98e28aadc19..c7ba7a361c4 100644 --- a/core/dbt/parser/sql.py +++ b/core/dbt/parser/sql.py @@ -1,14 +1,11 @@ import os from dataclasses import dataclass -from typing import Iterable from dbt.contracts.graph.manifest import SourceFile -from dbt.contracts.graph.nodes import SqlNode, Macro -from dbt.contracts.graph.unparsed import UnparsedMacro +from dbt.contracts.graph.nodes import SqlNode from dbt.exceptions import DbtInternalError from dbt.node_types import NodeType from dbt.parser.base import SimpleSQLParser -from dbt.parser.macros import MacroParser from dbt.parser.search import FileBlock @@ -46,17 +43,3 @@ def parse_remote(self, sql: str, name: str) -> SqlNode: source_file = SourceFile.remote(sql, self.project.project_name, "sql") contents = SqlBlock(block_name=name, file=source_file) return self.parse_node(contents) - - -class SqlMacroParser(MacroParser): - def parse_remote(self, contents) -> Iterable[Macro]: - base = UnparsedMacro( - path="from remote system", - original_file_path="from remote system", - package_name=self.project.project_name, - raw_code=contents, - language="sql", - resource_type=NodeType.Macro, - ) - for node in self.parse_unparsed_macros(base): - yield node diff --git a/core/dbt/task/compile.py b/core/dbt/task/compile.py index abd1d812003..8b1883dd339 100644 --- a/core/dbt/task/compile.py +++ b/core/dbt/task/compile.py @@ -40,6 +40,18 @@ def compile(self, manifest): class CompileTask(GraphRunnableTask): + # We add a new inline node to the manifest during initialization + # it should be removed before the task is complete + _inline_node_id = None + + # TODO remove when stu's PR about skip adapter cache is merged + def before_run(self, adapter, selected_uids: AbstractSet[str]): + if bool(getattr(self.args, "inline", None)): + # don't populate adapter cache when doing inline queries + pass + else: + super().before_run(adapter, selected_uids) + def raise_on_first_error(self): return True @@ -130,9 +142,17 @@ def _runtime_initialize(self): ) sql_node = block_parser.parse_remote(self.args.inline, "inline_query") process_node(self.config, self.manifest, sql_node) + # keep track of the node added to the manifest + self._inline_node_id = sql_node.unique_id super()._runtime_initialize() + def after_run(self, adapter, results): + # remove inline node from manifest + if self._inline_node_id: + self.manifest.nodes.pop(self._inline_node_id) + super().after_run(adapter, results) + def _handle_result(self, result): super()._handle_result(result) diff --git a/test/unit/test_lib.py b/test/unit/test_lib.py deleted file mode 100644 index 418e42f7ac7..00000000000 --- a/test/unit/test_lib.py +++ /dev/null @@ -1,62 +0,0 @@ -import os -import unittest -from unittest import mock -from dbt.contracts.results import RunningStatus -from dbt.lib import compile_sql -from dbt.adapters.postgres import Plugin - -from test.unit.utils import clear_plugin, inject_adapter - - -class MockContext: - def __init__(self, node): - self.timing = [] - self.node = mock.MagicMock() - self.node._event_status = {"node_status": RunningStatus.Started} - self.node.is_ephemeral_model = True - - -def noop_ephemeral_result(*args): - return None - - -class TestSqlCompileRunnerNoIntrospection(unittest.TestCase): - def setUp(self): - self.manifest = {"mock": "manifest"} - self.adapter = Plugin.adapter({}) - self.adapter.connection_for = mock.MagicMock() - self.ephemeral_result = lambda: None - inject_adapter(self.adapter, Plugin) - - def tearDown(self): - clear_plugin(Plugin) - - @mock.patch("dbt.lib._get_operation_node") - @mock.patch("dbt.task.sql.GenericSqlRunner.compile") - @mock.patch("dbt.task.sql.GenericSqlRunner.ephemeral_result", noop_ephemeral_result) - @mock.patch("dbt.task.base.ExecutionContext", MockContext) - def test__compile_and_execute__with_connection(self, mock_compile, mock_get_node): - """ - By default, env var for allowing introspection is true, and calling this - method should defer to the parent method. - """ - mock_get_node.return_value = ({}, None, self.adapter) - compile_sql(self.manifest, "some/path", None) - - mock_compile.assert_called_once_with(self.manifest) - self.adapter.connection_for.assert_called_once() - - @mock.patch("dbt.lib._get_operation_node") - @mock.patch("dbt.task.sql.GenericSqlRunner.compile") - @mock.patch("dbt.task.sql.GenericSqlRunner.ephemeral_result", noop_ephemeral_result) - @mock.patch("dbt.task.base.ExecutionContext", MockContext) - def test__compile_and_execute__without_connection(self, mock_compile, mock_get_node): - """ - Ensure that compile is called but does not attempt warehouse connection - """ - with mock.patch.dict(os.environ, {"__DBT_ALLOW_INTROSPECTION": "0"}): - mock_get_node.return_value = ({}, None, self.adapter) - compile_sql(self.manifest, "some/path", None) - - mock_compile.assert_called_once_with(self.manifest) - self.adapter.connection_for.assert_not_called() From 762a3f6592a13785193d119c94b7c7343c95881c Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Tue, 11 Apr 2023 11:09:38 -0700 Subject: [PATCH 10/10] use populate_cache flag --- core/dbt/task/compile.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/core/dbt/task/compile.py b/core/dbt/task/compile.py index 8b1883dd339..b900e9637ff 100644 --- a/core/dbt/task/compile.py +++ b/core/dbt/task/compile.py @@ -44,14 +44,6 @@ class CompileTask(GraphRunnableTask): # it should be removed before the task is complete _inline_node_id = None - # TODO remove when stu's PR about skip adapter cache is merged - def before_run(self, adapter, selected_uids: AbstractSet[str]): - if bool(getattr(self.args, "inline", None)): - # don't populate adapter cache when doing inline queries - pass - else: - super().before_run(adapter, selected_uids) - def raise_on_first_error(self): return True