From 5d9b8c599522fd9e702ce29cddd19a3c40d9e901 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Tue, 3 Jul 2018 19:35:07 -0400 Subject: [PATCH] Refactor: split out parsers (#809) Split parsers out into their own classes --- dbt/compilation.py | 1 - dbt/context/runtime.py | 3 +- dbt/loader.py | 26 +- dbt/parser.py | 790 --------------------------------------- dbt/parser/__init__.py | 24 ++ dbt/parser/analysis.py | 9 + dbt/parser/archives.py | 72 ++++ dbt/parser/base.py | 127 +++++++ dbt/parser/base_sql.py | 109 ++++++ dbt/parser/data_test.py | 9 + dbt/parser/hooks.py | 75 ++++ dbt/parser/macros.py | 115 ++++++ dbt/parser/models.py | 8 + dbt/parser/schemas.py | 246 ++++++++++++ dbt/parser/seeds.py | 70 ++++ dbt/parser/util.py | 79 ++++ test/unit/test_parser.py | 38 +- 17 files changed, 977 insertions(+), 824 deletions(-) delete mode 100644 dbt/parser.py create mode 100644 dbt/parser/__init__.py create mode 100644 dbt/parser/analysis.py create mode 100644 dbt/parser/archives.py create mode 100644 dbt/parser/base.py create mode 100644 dbt/parser/base_sql.py create mode 100644 dbt/parser/data_test.py create mode 100644 dbt/parser/hooks.py create mode 100644 dbt/parser/macros.py create mode 100644 dbt/parser/models.py create mode 100644 dbt/parser/schemas.py create mode 100644 dbt/parser/seeds.py create mode 100644 dbt/parser/util.py diff --git a/dbt/compilation.py b/dbt/compilation.py index 0431c85a309..05fc29c0cd1 100644 --- a/dbt/compilation.py +++ b/dbt/compilation.py @@ -20,7 +20,6 @@ import dbt.exceptions import dbt.flags import dbt.loader -import dbt.parser from dbt.clients.system import write_file from dbt.logger import GLOBAL_LOGGER as logger diff --git a/dbt/context/runtime.py b/dbt/context/runtime.py index efe0c375f1f..6b707f82090 100644 --- a/dbt/context/runtime.py +++ b/dbt/context/runtime.py @@ -3,6 +3,7 @@ import dbt.clients.jinja import dbt.context.common import dbt.flags +import dbt.parser from dbt.logger import GLOBAL_LOGGER as logger # noqa @@ -25,7 +26,7 @@ def do_ref(*args): else: dbt.exceptions.ref_invalid_args(model, args) - target_model = dbt.parser.resolve_ref( + target_model = dbt.parser.ParserUtils.resolve_ref( flat_graph, target_model_name, target_model_package, diff --git a/dbt/loader.py b/dbt/loader.py index 2c87c9de4d3..c0cc27dd835 100644 --- a/dbt/loader.py +++ b/dbt/loader.py @@ -1,9 +1,10 @@ import dbt.exceptions -import dbt.parser from dbt.node_types import NodeType from dbt.contracts.graph.parsed import ParsedManifest +import dbt.parser + class GraphLoader(object): @@ -18,7 +19,7 @@ def load_all(cls, root_project, all_projects): nodes.update(loader.load_all(root_project, all_projects, macros)) manifest = ParsedManifest(nodes=nodes, macros=macros) - manifest = dbt.parser.process_refs(manifest, root_project) + manifest = dbt.parser.ParserUtils.process_refs(manifest, root_project) return manifest @classmethod @@ -50,7 +51,7 @@ class MacroLoader(ResourceLoader): @classmethod def load_project(cls, root_project, all_projects, project, project_name, macros): - return dbt.parser.load_and_parse_macros( + return dbt.parser.MacroParser.load_and_parse( package_name=project_name, root_project=root_project, all_projects=all_projects, @@ -78,7 +79,7 @@ def load_all(cls, root_project, all_projects, macros=None): @classmethod def load_project(cls, root_project, all_projects, project, project_name, macros): - return dbt.parser.load_and_parse_sql( + return dbt.parser.ModelParser.load_and_parse( package_name=project_name, root_project=root_project, all_projects=all_projects, @@ -93,7 +94,7 @@ class OperationLoader(ResourceLoader): @classmethod def load_project(cls, root_project, all_projects, project, project_name, macros): - return dbt.parser.load_and_parse_macros( + return dbt.parser.MacroParser.load_and_parse( package_name=project_name, root_project=root_project, all_projects=all_projects, @@ -107,7 +108,7 @@ class AnalysisLoader(ResourceLoader): @classmethod def load_project(cls, root_project, all_projects, project, project_name, macros): - return dbt.parser.load_and_parse_sql( + return dbt.parser.AnalysisParser.load_and_parse( package_name=project_name, root_project=root_project, all_projects=all_projects, @@ -122,7 +123,7 @@ class SchemaTestLoader(ResourceLoader): @classmethod def load_project(cls, root_project, all_projects, project, project_name, macros): - return dbt.parser.load_and_parse_yml( + return dbt.parser.SchemaParser.load_and_parse( package_name=project_name, root_project=root_project, all_projects=all_projects, @@ -136,7 +137,7 @@ class DataTestLoader(ResourceLoader): @classmethod def load_project(cls, root_project, all_projects, project, project_name, macros): - return dbt.parser.load_and_parse_sql( + return dbt.parser.DataTestParser.load_and_parse( package_name=project_name, root_project=root_project, all_projects=all_projects, @@ -157,7 +158,7 @@ def load_all(cls, root_project, all_projects, macros=None): @classmethod def load_project(cls, root_project, all_projects, macros): - return dbt.parser.parse_archives_from_projects(root_project, + return dbt.parser.ArchiveParser.load_and_parse(root_project, all_projects, macros) @@ -170,8 +171,8 @@ def load_all(cls, root_project, all_projects, macros=None): @classmethod def load_project(cls, root_project, all_projects, macros): - return dbt.parser.load_and_parse_run_hooks(root_project, all_projects, - macros) + return dbt.parser.HookParser.load_and_parse(root_project, all_projects, + macros) class SeedLoader(ResourceLoader): @@ -179,13 +180,12 @@ class SeedLoader(ResourceLoader): @classmethod def load_project(cls, root_project, all_projects, project, project_name, macros): - return dbt.parser.load_and_parse_seeds( + return dbt.parser.SeedParser.load_and_parse( package_name=project_name, root_project=root_project, all_projects=all_projects, root_dir=project.get('project-root'), relative_dirs=project.get('data-paths', []), - resource_type=NodeType.Seed, macros=macros) diff --git a/dbt/parser.py b/dbt/parser.py deleted file mode 100644 index da9d27a271d..00000000000 --- a/dbt/parser.py +++ /dev/null @@ -1,790 +0,0 @@ -import copy -import os -import re -import hashlib -import collections - -import dbt.exceptions -import dbt.flags -import dbt.model -import dbt.utils -import dbt.hooks - -import jinja2.runtime -import dbt.clients.jinja -import dbt.clients.yaml_helper -import dbt.clients.agate_helper - -import dbt.context.parser - -import dbt.contracts.project - -from dbt.node_types import NodeType, RunHookType -from dbt.compat import basestring, to_string -from dbt.logger import GLOBAL_LOGGER as logger -from dbt.utils import get_pseudo_test_path, coalesce -from dbt.contracts.graph.unparsed import UnparsedMacro, UnparsedNode -from dbt.contracts.graph.parsed import ParsedMacro, ParsedNode - - -def get_path(resource_type, package_name, resource_name): - return "{}.{}.{}".format(resource_type, package_name, resource_name) - - -def get_test_path(package_name, resource_name): - return get_path(NodeType.Test, package_name, resource_name) - - -def resolve_ref(flat_graph, target_model_name, target_model_package, - current_project, node_package): - - if target_model_package is not None: - return dbt.utils.find_refable_by_name( - flat_graph, - target_model_name, - target_model_package) - - target_model = None - - # first pass: look for models in the current_project - target_model = dbt.utils.find_refable_by_name( - flat_graph, - target_model_name, - current_project) - - if target_model is not None and dbt.utils.is_enabled(target_model): - return target_model - - # second pass: look for models in the node's package - target_model = dbt.utils.find_refable_by_name( - flat_graph, - target_model_name, - node_package) - - if target_model is not None and dbt.utils.is_enabled(target_model): - return target_model - - # final pass: look for models in any package - # todo: exclude the packages we have already searched. overriding - # a package model in another package doesn't necessarily work atm - return dbt.utils.find_refable_by_name( - flat_graph, - target_model_name, - None) - - -def process_refs(manifest, current_project): - flat_graph = manifest.to_flat_graph() - for _, node in manifest.nodes.items(): - target_model = None - target_model_name = None - target_model_package = None - - for ref in node.get('refs', []): - if len(ref) == 1: - target_model_name = ref[0] - elif len(ref) == 2: - target_model_package, target_model_name = ref - - target_model = resolve_ref( - flat_graph, - target_model_name, - target_model_package, - current_project, - node.get('package_name')) - - if target_model is None: - # This may raise. Even if it doesn't, we don't want to add - # this node to the graph b/c there is no destination node - node.get('config', {})['enabled'] = False - dbt.utils.invalid_ref_fail_unless_test(node, - target_model_name, - target_model_package) - continue - - target_model_id = target_model.get('unique_id') - - node['depends_on']['nodes'].append(target_model_id) - flat_graph['nodes'][node['unique_id']] = node - - return manifest - - -def get_fqn(path, package_project_config, extra=[]): - parts = dbt.utils.split_path(path) - name, _ = os.path.splitext(parts[-1]) - fqn = ([package_project_config.get('name')] + - parts[:-1] + - extra + - [name]) - - return fqn - - -def parse_macro_file(macro_file_path, - macro_file_contents, - root_path, - package_name, - resource_type, - tags=None, - context=None): - - logger.debug("Parsing {}".format(macro_file_path)) - - to_return = {} - - if tags is None: - tags = [] - - context = {} - - # change these to actual kwargs - base_node = UnparsedMacro( - path=macro_file_path, - original_file_path=macro_file_path, - package_name=package_name, - raw_sql=macro_file_contents, - root_path=root_path, - ) - - try: - template = dbt.clients.jinja.get_template( - macro_file_contents, context, node=base_node) - except dbt.exceptions.CompilationException as e: - e.node = base_node - raise e - - for key, item in template.module.__dict__.items(): - if type(item) != jinja2.runtime.Macro: - continue - - node_type = None - if key.startswith(dbt.utils.MACRO_PREFIX): - node_type = NodeType.Macro - name = key.replace(dbt.utils.MACRO_PREFIX, '') - - elif key.startswith(dbt.utils.OPERATION_PREFIX): - node_type = NodeType.Operation - name = key.replace(dbt.utils.OPERATION_PREFIX, '') - - if node_type != resource_type: - continue - - unique_id = get_path(resource_type, package_name, name) - - merged = dbt.utils.deep_merge( - base_node.serialize(), - { - 'name': name, - 'unique_id': unique_id, - 'tags': tags, - 'resource_type': resource_type, - 'depends_on': {'macros': []}, - }) - - new_node = ParsedMacro( - template=template, - **merged) - - to_return[unique_id] = new_node - - return to_return - - -def parse_node(node, node_path, root_project_config, package_project_config, - all_projects, tags=None, fqn_extra=None, fqn=None, macros=None, - agate_table=None, archive_config=None): - """Parse a node, given an UnparsedNode and any other required information. - - agate_table should be set if the node came from a seed file. - archive_config should be set if the node is an Archive node. - """ - logger.debug("Parsing {}".format(node_path)) - - node = node.serialize() - - if agate_table is not None: - node['agate_table'] = agate_table - tags = coalesce(tags, []) - fqn_extra = coalesce(fqn_extra, []) - macros = coalesce(macros, {}) - - node.update({ - 'refs': [], - 'depends_on': { - 'nodes': [], - 'macros': [], - } - }) - - if fqn is None: - fqn = get_fqn(node.get('path'), package_project_config, fqn_extra) - - config = dbt.model.SourceConfig( - root_project_config, - package_project_config, - fqn, - node['resource_type']) - - node['unique_id'] = node_path - node['empty'] = ('raw_sql' in node and len(node['raw_sql'].strip()) == 0) - node['fqn'] = fqn - node['tags'] = tags - node['config_reference'] = config - - # Set this temporarily. Not the full config yet (as config() hasn't been - # called from jinja yet). But the Var() call below needs info about project - # level configs b/c they might contain refs. TODO: Restructure this? - config_dict = coalesce(archive_config, {}) - config_dict.update(config.config) - node['config'] = config_dict - - # Set this temporarily so get_rendered() below has access to a schema - profile = dbt.utils.get_profile_from_project(root_project_config) - default_schema = profile.get('schema', 'public') - node['schema'] = default_schema - - context = dbt.context.parser.generate(node, root_project_config, - {"macros": macros}) - - dbt.clients.jinja.get_rendered( - node.get('raw_sql'), context, node, - capture_macros=True) - - # Clean up any open connections opened by adapter functions that hit the db - db_wrapper = context['adapter'] - adapter = db_wrapper.adapter - profile = db_wrapper.profile - adapter.release_connection(profile, node.get('name')) - - # Special macro defined in the global project - schema_override = config.config.get('schema') - get_schema = context.get('generate_schema_name', lambda x: default_schema) - node['schema'] = get_schema(schema_override) - - # Overwrite node config - config_dict = node.get('config', {}) - config_dict.update(config.config) - node['config'] = config_dict - - for hook_type in dbt.hooks.ModelHookType.Both: - node['config'][hook_type] = dbt.hooks.get_hooks(node, hook_type) - - del node['config_reference'] - - return ParsedNode(**node) - - -def parse_sql_nodes(nodes, root_project, projects, tags=None, macros=None): - if tags is None: - tags = [] - - if macros is None: - macros = {} - - to_return = {} - - for n in nodes: - node = dbt.contracts.graph.unparsed.UnparsedNode(**n) - package_name = node.get('package_name') - - node_path = get_path(node.get('resource_type'), - package_name, - node.get('name')) - node_parsed = parse_node(node, - node_path, - root_project, - projects.get(package_name), - projects, - tags=tags, - macros=macros) - - # Ignore disabled nodes - if not node_parsed['config']['enabled']: - continue - - # Check for duplicate model names - existing_node = to_return.get(node_path) - if existing_node is not None: - dbt.exceptions.raise_duplicate_resource_name( - existing_node, node_parsed) - - to_return[node_path] = node_parsed - - return to_return - - -def load_and_parse_sql(package_name, root_project, all_projects, root_dir, - relative_dirs, resource_type, tags=None, macros=None): - extension = "[!.#~]*.sql" - - if tags is None: - tags = [] - - if macros is None: - macros = {} - - if dbt.flags.STRICT_MODE: - dbt.contracts.project.ProjectList(**all_projects) - - file_matches = dbt.clients.system.find_matching( - root_dir, - relative_dirs, - extension) - - result = [] - - for file_match in file_matches: - file_contents = dbt.clients.system.load_file_contents( - file_match.get('absolute_path')) - - parts = dbt.utils.split_path(file_match.get('relative_path', '')) - name, _ = os.path.splitext(parts[-1]) - - if resource_type == NodeType.Test: - path = dbt.utils.get_pseudo_test_path( - name, file_match.get('relative_path'), 'data_test') - elif resource_type == NodeType.Analysis: - path = os.path.join('analysis', file_match.get('relative_path')) - else: - path = file_match.get('relative_path') - - original_file_path = os.path.join( - file_match.get('searched_path'), - path) - - result.append({ - 'name': name, - 'root_path': root_dir, - 'resource_type': resource_type, - 'path': path, - 'original_file_path': original_file_path, - 'package_name': package_name, - 'raw_sql': file_contents - }) - - return parse_sql_nodes(result, root_project, all_projects, tags, macros) - - -def get_hooks_from_project(project_cfg, hook_type): - hooks = project_cfg.get(hook_type, []) - - if type(hooks) not in (list, tuple): - hooks = [hooks] - - return hooks - - -def get_hooks(all_projects, hook_type): - project_hooks = collections.defaultdict(list) - - for project_name, project in all_projects.items(): - hooks = get_hooks_from_project(project, hook_type) - project_hooks[project_name].extend(hooks) - - return project_hooks - - -def load_and_parse_run_hook_type(root_project, all_projects, hook_type, - macros=None): - - if dbt.flags.STRICT_MODE: - dbt.contracts.project.ProjectList(**all_projects) - - project_hooks = get_hooks(all_projects, hook_type) - - result = [] - for project_name, hooks in project_hooks.items(): - project = all_projects[project_name] - - for i, hook in enumerate(hooks): - hook_name = '{}-{}-{}'.format(project_name, hook_type, i) - hook_path = dbt.utils.get_pseudo_hook_path(hook_name) - - result.append({ - 'name': hook_name, - 'root_path': "{}/dbt_project.yml".format(project_name), - 'resource_type': NodeType.Operation, - 'path': hook_path, - 'original_file_path': hook_path, - 'package_name': project_name, - 'raw_sql': hook, - 'index': i - }) - - tags = [hook_type] - return parse_sql_nodes(result, root_project, all_projects, tags=tags, - macros=macros) - - -def load_and_parse_run_hooks(root_project, all_projects, macros=None): - if macros is None: - macros = {} - - hook_nodes = {} - for hook_type in RunHookType.Both: - project_hooks = load_and_parse_run_hook_type(root_project, - all_projects, - hook_type, - macros=macros) - hook_nodes.update(project_hooks) - - return hook_nodes - - -def load_and_parse_macros(package_name, root_project, all_projects, root_dir, - relative_dirs, resource_type, tags=None): - extension = "[!.#~]*.sql" - - if tags is None: - tags = [] - - if dbt.flags.STRICT_MODE: - dbt.contracts.project.ProjectList(**all_projects) - - file_matches = dbt.clients.system.find_matching( - root_dir, - relative_dirs, - extension) - - result = {} - - for file_match in file_matches: - file_contents = dbt.clients.system.load_file_contents( - file_match.get('absolute_path')) - - result.update( - parse_macro_file( - file_match.get('relative_path'), - file_contents, - root_dir, - package_name, - resource_type)) - - return result - - -def get_parsed_schema_test(test_node, test_type, model_name, config, - root_project, projects, macros): - - package_name = test_node.get('package_name') - test_namespace = None - original_test_type = test_type - split = test_type.split('.') - - if len(split) > 1: - test_type = split[1] - package_name = split[0] - test_namespace = package_name - - source_package = projects.get(package_name) - if source_package is None: - desc = '"{}" test on model "{}"'.format(original_test_type, model_name) - dbt.exceptions.raise_dep_not_found(test_node, desc, test_namespace) - - return parse_schema_test( - test_node, - model_name, - config, - test_namespace, - test_type, - root_project, - source_package, - all_projects=projects, - macros=macros) - - -def parse_schema_tests(tests, root_project, projects, macros=None): - to_return = {} - - for test in tests: - raw_yml = test.get('raw_yml') - test_name = "{}:{}".format(test.get('package_name'), test.get('path')) - - try: - test_yml = dbt.clients.yaml_helper.load_yaml_text(raw_yml) - except dbt.exceptions.ValidationException as e: - test_yml = None - logger.info("Error reading {} - Skipping\n{}".format(test_name, e)) - - if test_yml is None: - continue - - no_tests_warning = ("* WARNING: No constraints found for model" - " '{}' in file {}\n") - for model_name, test_spec in test_yml.items(): - if test_spec is None or test_spec.get('constraints') is None: - test_path = test.get('original_file_path', '') - logger.warning(no_tests_warning.format(model_name, test_path)) - continue - - for test_type, configs in test_spec.get('constraints', {}).items(): - if configs is None: - continue - - if not isinstance(configs, (list, tuple)): - - dbt.utils.compiler_warning( - model_name, - "Invalid test config given in {} near {}".format( - test.get('path'), - configs)) - continue - - for config in configs: - to_add = get_parsed_schema_test( - test, test_type, model_name, config, - root_project, projects, macros) - - if to_add is not None: - to_return[to_add.get('unique_id')] = to_add - - return to_return - - -def get_nice_schema_test_name(test_type, test_name, args): - - flat_args = [] - for arg_name in sorted(args): - arg_val = args[arg_name] - - if isinstance(arg_val, dict): - parts = arg_val.values() - elif isinstance(arg_val, (list, tuple)): - parts = arg_val - else: - parts = [arg_val] - - flat_args.extend([str(part) for part in parts]) - - clean_flat_args = [re.sub('[^0-9a-zA-Z_]+', '_', arg) for arg in flat_args] - unique = "__".join(clean_flat_args) - - cutoff = 32 - if len(unique) <= cutoff: - label = unique - else: - label = hashlib.md5(unique.encode('utf-8')).hexdigest() - - filename = '{}_{}_{}'.format(test_type, test_name, label) - name = '{}_{}_{}'.format(test_type, test_name, unique) - - return filename, name - - -def as_kwarg(key, value): - test_value = to_string(value) - is_function = re.match(r'^\s*(ref|var)\s*\(.+\)\s*$', test_value) - - # if the value is a function, don't wrap it in quotes! - if is_function: - formatted_value = value - else: - formatted_value = value.__repr__() - - return "{key}={value}".format(key=key, value=formatted_value) - - -def parse_schema_test(test_base, model_name, test_config, test_namespace, - test_type, root_project_config, package_project_config, - all_projects, macros=None): - - if isinstance(test_config, (basestring, int, float, bool)): - test_args = {'arg': test_config} - else: - test_args = test_config - - # sort the dict so the keys are rendered deterministically (for tests) - kwargs = [as_kwarg(key, test_args[key]) for key in sorted(test_args)] - - if test_namespace is None: - macro_name = "test_{}".format(test_type) - else: - macro_name = "{}.test_{}".format(test_namespace, test_type) - - raw_sql = "{{{{ {macro}(model=ref('{model}'), {kwargs}) }}}}".format(**{ - 'model': model_name, - 'macro': macro_name, - 'kwargs': ", ".join(kwargs) - }) - - base_path = test_base.get('path') - hashed_name, full_name = get_nice_schema_test_name(test_type, model_name, - test_args) - - hashed_path = get_pseudo_test_path(hashed_name, base_path, 'schema_test') - full_path = get_pseudo_test_path(full_name, base_path, 'schema_test') - - # supply our own fqn which overrides the hashed version from the path - fqn_override = get_fqn(full_path, package_project_config) - - to_return = UnparsedNode( - name=full_name, - resource_type=test_base.get('resource_type'), - package_name=test_base.get('package_name'), - root_path=test_base.get('root_path'), - path=hashed_path, - original_file_path=test_base.get('original_file_path'), - raw_sql=raw_sql - ) - - return parse_node(to_return, - get_test_path(test_base.get('package_name'), - full_name), - root_project_config, - package_project_config, - all_projects, - tags=['schema'], - fqn_extra=None, - fqn=fqn_override, - macros=macros) - - -def load_and_parse_yml(package_name, root_project, all_projects, root_dir, - relative_dirs, macros=None): - extension = "[!.#~]*.yml" - - if dbt.flags.STRICT_MODE: - dbt.contracts.project.ProjectList(**all_projects) - - file_matches = dbt.clients.system.find_matching( - root_dir, - relative_dirs, - extension) - - result = [] - - for file_match in file_matches: - file_contents = dbt.clients.system.load_file_contents( - file_match.get('absolute_path'), strip=False) - - original_file_path = os.path.join(file_match.get('searched_path'), - file_match.get('relative_path')) - - parts = dbt.utils.split_path(file_match.get('relative_path', '')) - name, _ = os.path.splitext(parts[-1]) - - result.append({ - 'name': name, - 'root_path': root_dir, - 'resource_type': NodeType.Test, - 'path': file_match.get('relative_path'), - 'original_file_path': original_file_path, - 'package_name': package_name, - 'raw_yml': file_contents - }) - - return parse_schema_tests(result, root_project, all_projects, macros) - - -def parse_archives_from_projects(root_project, all_projects, macros=None): - archives = [] - to_return = {} - - for name, project in all_projects.items(): - archives = archives + parse_archives_from_project(project) - - # We're going to have a similar issue with parsed nodes, if we want to - # make parse_node return those. - for a in archives: - # archives have a config, but that would make for an invalid - # UnparsedNode, so remove it and pass it along to parse_node as an - # argument. - archive_config = a.pop('config') - archive = UnparsedNode(**a) - node_path = get_path(archive.get('resource_type'), - archive.get('package_name'), - archive.get('name')) - - to_return[node_path] = parse_node( - archive, - node_path, - root_project, - all_projects.get(archive.get('package_name')), - all_projects, - macros=macros, - archive_config=archive_config) - - return to_return - - -def parse_archives_from_project(project): - archives = [] - archive_configs = project.get('archive', []) - - for archive_config in archive_configs: - tables = archive_config.get('tables') - - if tables is None: - continue - - for table in tables: - config = table.copy() - config['source_schema'] = archive_config.get('source_schema') - config['target_schema'] = archive_config.get('target_schema') - - fake_path = [config['target_schema'], config['target_table']] - archives.append({ - 'name': table.get('target_table'), - 'root_path': project.get('project-root'), - 'resource_type': NodeType.Archive, - 'path': os.path.join('archive', *fake_path), - 'original_file_path': 'dbt_project.yml', - 'package_name': project.get('name'), - 'config': config, - 'raw_sql': '{{config(materialized="archive")}} -- noop' - }) - - return archives - - -def parse_seed_file(file_match, root_dir, package_name): - """Parse the given seed file, returning an UnparsedNode and the agate - table. - """ - abspath = file_match['absolute_path'] - logger.debug("Parsing {}".format(abspath)) - to_return = {} - table_name = os.path.basename(abspath)[:-4] - node = UnparsedNode( - path=file_match['relative_path'], - name=table_name, - root_path=root_dir, - resource_type=NodeType.Seed, - # Give this raw_sql so it conforms to the node spec, - # use dummy text so it doesn't look like an empty node - raw_sql='-- csv --', - package_name=package_name, - original_file_path=os.path.join(file_match.get('searched_path'), - file_match.get('relative_path')), - ) - try: - table = dbt.clients.agate_helper.from_csv(abspath) - except ValueError as e: - dbt.exceptions.raise_compiler_error(str(e), node) - table.original_abspath = abspath - return node, table - - -def load_and_parse_seeds(package_name, root_project, all_projects, root_dir, - relative_dirs, resource_type, tags=None, macros=None): - extension = "[!.#~]*.csv" - if dbt.flags.STRICT_MODE: - dbt.contracts.project.ProjectList(**all_projects) - file_matches = dbt.clients.system.find_matching( - root_dir, - relative_dirs, - extension) - result = {} - for file_match in file_matches: - node, agate_table = parse_seed_file(file_match, root_dir, - package_name) - node_path = get_path(NodeType.Seed, package_name, node.name) - parsed = parse_node(node, node_path, root_project, - all_projects.get(package_name), - all_projects, tags=tags, macros=macros, - agate_table=agate_table) - # parsed['empty'] = False - result[node_path] = parsed - - return result diff --git a/dbt/parser/__init__.py b/dbt/parser/__init__.py new file mode 100644 index 00000000000..ed43b91093c --- /dev/null +++ b/dbt/parser/__init__.py @@ -0,0 +1,24 @@ + +from .analysis import AnalysisParser +from .archives import ArchiveParser +from .data_test import DataTestParser +from .hooks import HookParser +from .macros import MacroParser +from .models import ModelParser +from .schemas import SchemaParser +from .seeds import SeedParser + +from .util import ParserUtils + +__all__ = [ + 'AnalysisParser', + 'ArchiveParser', + 'DataTestParser', + 'HookParser', + 'MacroParser', + 'ModelParser', + 'SchemaParser', + 'SeedParser', + + 'ParserUtils', +] diff --git a/dbt/parser/analysis.py b/dbt/parser/analysis.py new file mode 100644 index 00000000000..5d218544983 --- /dev/null +++ b/dbt/parser/analysis.py @@ -0,0 +1,9 @@ + +from dbt.parser.base_sql import BaseSqlParser +import os + + +class AnalysisParser(BaseSqlParser): + @classmethod + def get_compiled_path(cls, name, relative_path): + return os.path.join('analysis', relative_path) diff --git a/dbt/parser/archives.py b/dbt/parser/archives.py new file mode 100644 index 00000000000..e73c18edec8 --- /dev/null +++ b/dbt/parser/archives.py @@ -0,0 +1,72 @@ + +from dbt.contracts.graph.unparsed import UnparsedNode +from dbt.node_types import NodeType +from dbt.parser.base import BaseParser + +import os + + +class ArchiveParser(BaseParser): + @classmethod + def parse_archives_from_project(cls, project): + archives = [] + archive_configs = project.get('archive', []) + + for archive_config in archive_configs: + tables = archive_config.get('tables') + + if tables is None: + continue + + for table in tables: + config = table.copy() + config['source_schema'] = archive_config.get('source_schema') + config['target_schema'] = archive_config.get('target_schema') + + fake_path = [config['target_schema'], config['target_table']] + archives.append({ + 'name': table.get('target_table'), + 'root_path': project.get('project-root'), + 'resource_type': NodeType.Archive, + 'path': os.path.join('archive', *fake_path), + 'original_file_path': 'dbt_project.yml', + 'package_name': project.get('name'), + 'config': config, + 'raw_sql': '{{config(materialized="archive")}} -- noop' + }) + + return archives + + @classmethod + def load_and_parse(cls, root_project, all_projects, macros=None): + """Load and parse archives in a list of projects. Returns a dict + that maps unique ids onto ParsedNodes""" + + archives = [] + to_return = {} + + for name, project in all_projects.items(): + archives = archives + cls.parse_archives_from_project(project) + + # We're going to have a similar issue with parsed nodes, if we want to + # make parse_node return those. + for a in archives: + # archives have a config, but that would make for an invalid + # UnparsedNode, so remove it and pass it along to parse_node as an + # argument. + archive_config = a.pop('config') + archive = UnparsedNode(**a) + node_path = cls.get_path(archive.get('resource_type'), + archive.get('package_name'), + archive.get('name')) + + to_return[node_path] = cls.parse_node( + archive, + node_path, + root_project, + all_projects.get(archive.get('package_name')), + all_projects, + macros=macros, + archive_config=archive_config) + + return to_return diff --git a/dbt/parser/base.py b/dbt/parser/base.py new file mode 100644 index 00000000000..be8b52ba3f5 --- /dev/null +++ b/dbt/parser/base.py @@ -0,0 +1,127 @@ +import os + +import dbt.exceptions +import dbt.flags +import dbt.model +import dbt.utils +import dbt.hooks +import dbt.clients.jinja +import dbt.context.parser + +from dbt.utils import coalesce +from dbt.logger import GLOBAL_LOGGER as logger +from dbt.contracts.graph.parsed import ParsedNode + + +class BaseParser(object): + + @classmethod + def load_and_parse(cls, *args, **kwargs): + raise dbt.exceptions.NotImplementedException("Not implemented") + + @classmethod + def get_path(cls, resource_type, package_name, resource_name): + """Returns a unique identifier for a resource""" + + return "{}.{}.{}".format(resource_type, package_name, resource_name) + + @classmethod + def get_fqn(cls, path, package_project_config, extra=[]): + parts = dbt.utils.split_path(path) + name, _ = os.path.splitext(parts[-1]) + fqn = ([package_project_config.get('name')] + + parts[:-1] + + extra + + [name]) + + return fqn + + @classmethod + def parse_node(cls, node, node_path, root_project_config, + package_project_config, all_projects, + tags=None, fqn_extra=None, fqn=None, macros=None, + agate_table=None, archive_config=None): + """Parse a node, given an UnparsedNode and any other required information. + + agate_table should be set if the node came from a seed file. + archive_config should be set if the node is an Archive node. + """ + logger.debug("Parsing {}".format(node_path)) + + node = node.serialize() + + if agate_table is not None: + node['agate_table'] = agate_table + tags = coalesce(tags, []) + fqn_extra = coalesce(fqn_extra, []) + macros = coalesce(macros, {}) + + node.update({ + 'refs': [], + 'depends_on': { + 'nodes': [], + 'macros': [], + } + }) + + if fqn is None: + fqn = cls.get_fqn(node.get('path'), package_project_config, + fqn_extra) + + config = dbt.model.SourceConfig( + root_project_config, + package_project_config, + fqn, + node['resource_type']) + + node['unique_id'] = node_path + node['empty'] = ( + 'raw_sql' in node and len(node['raw_sql'].strip()) == 0 + ) + node['fqn'] = fqn + node['tags'] = tags + node['config_reference'] = config + + # Set this temporarily. Not the full config yet (as config() hasn't + # been called from jinja yet). But the Var() call below needs info + # about project level configs b/c they might contain refs. + # TODO: Restructure this? + config_dict = coalesce(archive_config, {}) + config_dict.update(config.config) + node['config'] = config_dict + + # Set this temporarily so get_rendered() below has access to a schema + profile = dbt.utils.get_profile_from_project(root_project_config) + default_schema = profile.get('schema', 'public') + node['schema'] = default_schema + + context = dbt.context.parser.generate(node, root_project_config, + {"macros": macros}) + + dbt.clients.jinja.get_rendered( + node.get('raw_sql'), context, node, + capture_macros=True) + + # Clean up any open conns opened by adapter functions that hit the db + db_wrapper = context['adapter'] + adapter = db_wrapper.adapter + profile = db_wrapper.profile + adapter.release_connection(profile, node.get('name')) + + # Special macro defined in the global project + schema_override = config.config.get('schema') + get_schema = context.get('generate_schema_name', + lambda x: default_schema) + node['schema'] = get_schema(schema_override) + + # Overwrite node config + config_dict = node.get('config', {}) + config_dict.update(config.config) + node['config'] = config_dict + + for hook_type in dbt.hooks.ModelHookType.Both: + node['config'][hook_type] = dbt.hooks.get_hooks(node, hook_type) + + del node['config_reference'] + + return ParsedNode(**node) diff --git a/dbt/parser/base_sql.py b/dbt/parser/base_sql.py new file mode 100644 index 00000000000..236b04fa910 --- /dev/null +++ b/dbt/parser/base_sql.py @@ -0,0 +1,109 @@ + +import os + +import dbt.contracts.project +import dbt.exceptions +import dbt.clients.system +import dbt.utils +import dbt.flags + +from dbt.contracts.graph.unparsed import UnparsedNode +from dbt.parser.base import BaseParser + + +class BaseSqlParser(BaseParser): + @classmethod + def get_compiled_path(cls, name, relative_path): + raise dbt.exceptions.NotImplementedException("Not implemented") + + @classmethod + def load_and_parse(cls, package_name, root_project, all_projects, root_dir, + relative_dirs, resource_type, tags=None, macros=None): + """Load and parse models in a list of directories. Returns a dict + that maps unique ids onto ParsedNodes""" + + extension = "[!.#~]*.sql" + + if tags is None: + tags = [] + + if macros is None: + macros = {} + + if dbt.flags.STRICT_MODE: + dbt.contracts.project.ProjectList(**all_projects) + + file_matches = dbt.clients.system.find_matching( + root_dir, + relative_dirs, + extension) + + result = [] + + for file_match in file_matches: + file_contents = dbt.clients.system.load_file_contents( + file_match.get('absolute_path')) + + parts = dbt.utils.split_path(file_match.get('relative_path', '')) + name, _ = os.path.splitext(parts[-1]) + + path = cls.get_compiled_path(name, file_match.get('relative_path')) + + original_file_path = os.path.join( + file_match.get('searched_path'), + path) + + result.append({ + 'name': name, + 'root_path': root_dir, + 'resource_type': resource_type, + 'path': path, + 'original_file_path': original_file_path, + 'package_name': package_name, + 'raw_sql': file_contents + }) + + return cls.parse_sql_nodes(result, root_project, all_projects, tags, + macros) + + @classmethod + def parse_sql_nodes(cls, nodes, root_project, projects, + tags=None, macros=None): + + if tags is None: + tags = [] + + if macros is None: + macros = {} + + to_return = {} + + for n in nodes: + node = UnparsedNode(**n) + package_name = node.get('package_name') + + node_path = cls.get_path(node.get('resource_type'), + package_name, + node.get('name')) + + node_parsed = cls.parse_node(node, + node_path, + root_project, + projects.get(package_name), + projects, + tags=tags, + macros=macros) + + # Ignore disabled nodes + if not node_parsed['config']['enabled']: + continue + + # Check for duplicate model names + existing_node = to_return.get(node_path) + if existing_node is not None: + dbt.exceptions.raise_duplicate_resource_name( + existing_node, node_parsed) + + to_return[node_path] = node_parsed + + return to_return diff --git a/dbt/parser/data_test.py b/dbt/parser/data_test.py new file mode 100644 index 00000000000..5169ba91e13 --- /dev/null +++ b/dbt/parser/data_test.py @@ -0,0 +1,9 @@ + +from dbt.parser.base_sql import BaseSqlParser +import dbt.utils + + +class DataTestParser(BaseSqlParser): + @classmethod + def get_compiled_path(cls, name, relative_path): + return dbt.utils.get_pseudo_test_path(name, relative_path, 'data_test') diff --git a/dbt/parser/hooks.py b/dbt/parser/hooks.py new file mode 100644 index 00000000000..3f8d513bd05 --- /dev/null +++ b/dbt/parser/hooks.py @@ -0,0 +1,75 @@ + +import collections + +import dbt.flags +import dbt.contracts.project +import dbt.utils + +from dbt.parser.base_sql import BaseSqlParser +from dbt.node_types import NodeType, RunHookType + + +class HookParser(BaseSqlParser): + @classmethod + def get_hooks_from_project(cls, project_cfg, hook_type): + hooks = project_cfg.get(hook_type, []) + + if type(hooks) not in (list, tuple): + hooks = [hooks] + + return hooks + + @classmethod + def get_hooks(cls, all_projects, hook_type): + project_hooks = collections.defaultdict(list) + + for project_name, project in all_projects.items(): + hooks = cls.get_hooks_from_project(project, hook_type) + project_hooks[project_name].extend(hooks) + + return project_hooks + + @classmethod + def load_and_parse_run_hook_type(cls, root_project, all_projects, + hook_type, macros=None): + + if dbt.flags.STRICT_MODE: + dbt.contracts.project.ProjectList(**all_projects) + + project_hooks = cls.get_hooks(all_projects, hook_type) + + result = [] + for project_name, hooks in project_hooks.items(): + for i, hook in enumerate(hooks): + hook_name = '{}-{}-{}'.format(project_name, hook_type, i) + hook_path = dbt.utils.get_pseudo_hook_path(hook_name) + + result.append({ + 'name': hook_name, + 'root_path': "{}/dbt_project.yml".format(project_name), + 'resource_type': NodeType.Operation, + 'path': hook_path, + 'original_file_path': hook_path, + 'package_name': project_name, + 'raw_sql': hook, + 'index': i + }) + + tags = [hook_type] + return cls.parse_sql_nodes(result, root_project, all_projects, + tags=tags, macros=macros) + + @classmethod + def load_and_parse(cls, root_project, all_projects, macros=None): + if macros is None: + macros = {} + + hook_nodes = {} + for hook_type in RunHookType.Both: + project_hooks = cls.load_and_parse_run_hook_type(root_project, + all_projects, + hook_type, + macros=macros) + hook_nodes.update(project_hooks) + + return hook_nodes diff --git a/dbt/parser/macros.py b/dbt/parser/macros.py new file mode 100644 index 00000000000..cb1711c9eff --- /dev/null +++ b/dbt/parser/macros.py @@ -0,0 +1,115 @@ + +import jinja2.runtime + +import dbt.exceptions +import dbt.flags +import dbt.utils + +import dbt.clients.jinja +import dbt.clients.system +import dbt.contracts.project + +from dbt.parser.base import BaseParser +from dbt.node_types import NodeType +from dbt.logger import GLOBAL_LOGGER as logger +from dbt.contracts.graph.unparsed import UnparsedMacro +from dbt.contracts.graph.parsed import ParsedMacro + + +class MacroParser(BaseParser): + @classmethod + def parse_macro_file(cls, macro_file_path, macro_file_contents, root_path, + package_name, resource_type, tags=None, context=None): + + logger.debug("Parsing {}".format(macro_file_path)) + + to_return = {} + + if tags is None: + tags = [] + + context = {} + + # change these to actual kwargs + base_node = UnparsedMacro( + path=macro_file_path, + original_file_path=macro_file_path, + package_name=package_name, + raw_sql=macro_file_contents, + root_path=root_path, + ) + + try: + template = dbt.clients.jinja.get_template( + macro_file_contents, context, node=base_node) + except dbt.exceptions.CompilationException as e: + e.node = base_node + raise e + + for key, item in template.module.__dict__.items(): + if type(item) != jinja2.runtime.Macro: + continue + + node_type = None + if key.startswith(dbt.utils.MACRO_PREFIX): + node_type = NodeType.Macro + name = key.replace(dbt.utils.MACRO_PREFIX, '') + + elif key.startswith(dbt.utils.OPERATION_PREFIX): + node_type = NodeType.Operation + name = key.replace(dbt.utils.OPERATION_PREFIX, '') + + if node_type != resource_type: + continue + + unique_id = cls.get_path(resource_type, package_name, name) + + merged = dbt.utils.deep_merge( + base_node.serialize(), + { + 'name': name, + 'unique_id': unique_id, + 'tags': tags, + 'resource_type': resource_type, + 'depends_on': {'macros': []}, + }) + + new_node = ParsedMacro( + template=template, + **merged) + + to_return[unique_id] = new_node + + return to_return + + @classmethod + def load_and_parse(cls, package_name, root_project, all_projects, root_dir, + relative_dirs, resource_type, tags=None): + extension = "[!.#~]*.sql" + + if tags is None: + tags = [] + + if dbt.flags.STRICT_MODE: + dbt.contracts.project.ProjectList(**all_projects) + + file_matches = dbt.clients.system.find_matching( + root_dir, + relative_dirs, + extension) + + result = {} + + for file_match in file_matches: + file_contents = dbt.clients.system.load_file_contents( + file_match.get('absolute_path')) + + result.update( + cls.parse_macro_file( + file_match.get('relative_path'), + file_contents, + root_dir, + package_name, + resource_type)) + + return result diff --git a/dbt/parser/models.py b/dbt/parser/models.py new file mode 100644 index 00000000000..1b1c9eaa7d0 --- /dev/null +++ b/dbt/parser/models.py @@ -0,0 +1,8 @@ + +from dbt.parser.base_sql import BaseSqlParser + + +class ModelParser(BaseSqlParser): + @classmethod + def get_compiled_path(cls, name, relative_path): + return relative_path diff --git a/dbt/parser/schemas.py b/dbt/parser/schemas.py new file mode 100644 index 00000000000..0f545e91182 --- /dev/null +++ b/dbt/parser/schemas.py @@ -0,0 +1,246 @@ +import os +import re +import hashlib + +import dbt.exceptions +import dbt.flags +import dbt.utils + +import dbt.clients.yaml_helper +import dbt.context.parser +import dbt.contracts.project + +from dbt.node_types import NodeType +from dbt.compat import basestring, to_string +from dbt.logger import GLOBAL_LOGGER as logger +from dbt.utils import get_pseudo_test_path +from dbt.contracts.graph.unparsed import UnparsedNode +from dbt.parser.base import BaseParser + + +def get_nice_schema_test_name(test_type, test_name, args): + + flat_args = [] + for arg_name in sorted(args): + arg_val = args[arg_name] + + if isinstance(arg_val, dict): + parts = arg_val.values() + elif isinstance(arg_val, (list, tuple)): + parts = arg_val + else: + parts = [arg_val] + + flat_args.extend([str(part) for part in parts]) + + clean_flat_args = [re.sub('[^0-9a-zA-Z_]+', '_', arg) for arg in flat_args] + unique = "__".join(clean_flat_args) + + cutoff = 32 + if len(unique) <= cutoff: + label = unique + else: + label = hashlib.md5(unique.encode('utf-8')).hexdigest() + + filename = '{}_{}_{}'.format(test_type, test_name, label) + name = '{}_{}_{}'.format(test_type, test_name, unique) + + return filename, name + + +def as_kwarg(key, value): + test_value = to_string(value) + is_function = re.match(r'^\s*(ref|var)\s*\(.+\)\s*$', test_value) + + # if the value is a function, don't wrap it in quotes! + if is_function: + formatted_value = value + else: + formatted_value = value.__repr__() + + return "{key}={value}".format(key=key, value=formatted_value) + + +class SchemaParser(BaseParser): + + @classmethod + def parse_schema_test(cls, test_base, model_name, test_config, + test_namespace, test_type, root_project_config, + package_project_config, all_projects, macros=None): + + if isinstance(test_config, (basestring, int, float, bool)): + test_args = {'arg': test_config} + else: + test_args = test_config + + # sort the dict so the keys are rendered deterministically (for tests) + kwargs = [as_kwarg(key, test_args[key]) for key in sorted(test_args)] + + if test_namespace is None: + macro_name = "test_{}".format(test_type) + else: + macro_name = "{}.test_{}".format(test_namespace, test_type) + + raw_sql = "{{{{ {macro}(model=ref('{model}'), {kwargs}) }}}}".format( + **{ + 'model': model_name, + 'macro': macro_name, + 'kwargs': ", ".join(kwargs) + } + ) + + base_path = test_base.get('path') + hashed_name, full_name = get_nice_schema_test_name(test_type, + model_name, + test_args) + + hashed_path = get_pseudo_test_path(hashed_name, base_path, + 'schema_test') + full_path = get_pseudo_test_path(full_name, base_path, + 'schema_test') + + # supply our own fqn which overrides the hashed version from the path + fqn_override = cls.get_fqn(full_path, package_project_config) + package_name = test_base.get('package_name') + node_path = cls.get_path(NodeType.Test, package_name, full_name) + + to_return = UnparsedNode( + name=full_name, + resource_type=test_base.get('resource_type'), + package_name=package_name, + root_path=test_base.get('root_path'), + path=hashed_path, + original_file_path=test_base.get('original_file_path'), + raw_sql=raw_sql + ) + + return cls.parse_node(to_return, + node_path, + root_project_config, + package_project_config, + all_projects, + tags=['schema'], + fqn_extra=None, + fqn=fqn_override, + macros=macros) + + @classmethod + def get_parsed_schema_test(cls, test_node, test_type, model_name, config, + root_project, projects, macros): + + package_name = test_node.get('package_name') + test_namespace = None + original_test_type = test_type + split = test_type.split('.') + + if len(split) > 1: + test_type = split[1] + package_name = split[0] + test_namespace = package_name + + source_package = projects.get(package_name) + if source_package is None: + desc = '"{}" test on model "{}"'.format(original_test_type, + model_name) + dbt.exceptions.raise_dep_not_found(test_node, desc, test_namespace) + + return cls.parse_schema_test( + test_node, + model_name, + config, + test_namespace, + test_type, + root_project, + source_package, + all_projects=projects, + macros=macros) + + @classmethod + def parse_schema_tests(cls, tests, root_project, projects, macros=None): + to_return = {} + + for test in tests: + raw_yml = test.get('raw_yml') + test_name = "{}:{}".format(test.get('package_name'), + test.get('path')) + + try: + test_yml = dbt.clients.yaml_helper.load_yaml_text(raw_yml) + except dbt.exceptions.ValidationException as e: + test_yml = None + logger.info("Error reading {} - Skipping\n{}".format( + test_name, e)) + + if test_yml is None: + continue + + no_tests_warning = ("* WARNING: No constraints found for model" + " '{}' in file {}\n") + for model_name, test_spec in test_yml.items(): + if test_spec is None or test_spec.get('constraints') is None: + test_path = test.get('original_file_path', '') + logger.warning(no_tests_warning.format(model_name, + test_path)) + continue + + constraints = test_spec.get('constraints', {}) + for test_type, configs in constraints.items(): + if configs is None: + continue + + if not isinstance(configs, (list, tuple)): + + dbt.utils.compiler_warning( + model_name, + "Invalid test config given in {} near {}".format( + test.get('path'), + configs)) + continue + + for config in configs: + to_add = cls.get_parsed_schema_test( + test, test_type, model_name, config, + root_project, projects, macros) + + if to_add is not None: + to_return[to_add.get('unique_id')] = to_add + + return to_return + + @classmethod + def load_and_parse(cls, package_name, root_project, all_projects, root_dir, + relative_dirs, macros=None): + extension = "[!.#~]*.yml" + + if dbt.flags.STRICT_MODE: + dbt.contracts.project.ProjectList(**all_projects) + + file_matches = dbt.clients.system.find_matching( + root_dir, + relative_dirs, + extension) + + result = [] + + for file_match in file_matches: + file_contents = dbt.clients.system.load_file_contents( + file_match.get('absolute_path'), strip=False) + + original_file_path = os.path.join(file_match.get('searched_path'), + file_match.get('relative_path')) + + parts = dbt.utils.split_path(file_match.get('relative_path', '')) + name, _ = os.path.splitext(parts[-1]) + + result.append({ + 'name': name, + 'root_path': root_dir, + 'resource_type': NodeType.Test, + 'path': file_match.get('relative_path'), + 'original_file_path': original_file_path, + 'package_name': package_name, + 'raw_yml': file_contents + }) + + return cls.parse_schema_tests(result, root_project, all_projects, + macros) diff --git a/dbt/parser/seeds.py b/dbt/parser/seeds.py new file mode 100644 index 00000000000..d2e4ab9e372 --- /dev/null +++ b/dbt/parser/seeds.py @@ -0,0 +1,70 @@ +import os + +import dbt.flags +import dbt.clients.agate_helper +import dbt.clients.system +import dbt.context.parser +import dbt.contracts.project +import dbt.exceptions + +from dbt.node_types import NodeType +from dbt.logger import GLOBAL_LOGGER as logger +from dbt.contracts.graph.unparsed import UnparsedNode +from dbt.parser.base import BaseParser + + +class SeedParser(BaseParser): + @classmethod + def parse_seed_file(cls, file_match, root_dir, package_name): + """Parse the given seed file, returning an UnparsedNode and the agate + table. + """ + abspath = file_match['absolute_path'] + logger.debug("Parsing {}".format(abspath)) + table_name = os.path.basename(abspath)[:-4] + node = UnparsedNode( + path=file_match['relative_path'], + name=table_name, + root_path=root_dir, + resource_type=NodeType.Seed, + # Give this raw_sql so it conforms to the node spec, + # use dummy text so it doesn't look like an empty node + raw_sql='-- csv --', + package_name=package_name, + original_file_path=os.path.join(file_match.get('searched_path'), + file_match.get('relative_path')), + ) + try: + table = dbt.clients.agate_helper.from_csv(abspath) + except ValueError as e: + dbt.exceptions.raise_compiler_error(str(e), node) + table.original_abspath = abspath + return node, table + + @classmethod + def load_and_parse(cls, package_name, root_project, all_projects, root_dir, + relative_dirs, tags=None, macros=None): + """Load and parse seed files in a list of directories. Returns a dict + that maps unique ids onto ParsedNodes""" + + extension = "[!.#~]*.csv" + if dbt.flags.STRICT_MODE: + dbt.contracts.project.ProjectList(**all_projects) + + file_matches = dbt.clients.system.find_matching( + root_dir, + relative_dirs, + extension) + + result = {} + for file_match in file_matches: + node, agate_table = cls.parse_seed_file(file_match, root_dir, + package_name) + node_path = cls.get_path(NodeType.Seed, package_name, node.name) + parsed = cls.parse_node(node, node_path, root_project, + all_projects.get(package_name), + all_projects, tags=tags, macros=macros, + agate_table=agate_table) + result[node_path] = parsed + + return result diff --git a/dbt/parser/util.py b/dbt/parser/util.py new file mode 100644 index 00000000000..9a09ec3b2c1 --- /dev/null +++ b/dbt/parser/util.py @@ -0,0 +1,79 @@ + +import dbt.utils + + +class ParserUtils(object): + @classmethod + def resolve_ref(cls, flat_graph, target_model_name, target_model_package, + current_project, node_package): + + if target_model_package is not None: + return dbt.utils.find_refable_by_name( + flat_graph, + target_model_name, + target_model_package) + + target_model = None + + # first pass: look for models in the current_project + target_model = dbt.utils.find_refable_by_name( + flat_graph, + target_model_name, + current_project) + + if target_model is not None and dbt.utils.is_enabled(target_model): + return target_model + + # second pass: look for models in the node's package + target_model = dbt.utils.find_refable_by_name( + flat_graph, + target_model_name, + node_package) + + if target_model is not None and dbt.utils.is_enabled(target_model): + return target_model + + # final pass: look for models in any package + # todo: exclude the packages we have already searched. overriding + # a package model in another package doesn't necessarily work atm + return dbt.utils.find_refable_by_name( + flat_graph, + target_model_name, + None) + + @classmethod + def process_refs(cls, manifest, current_project): + flat_graph = manifest.to_flat_graph() + for _, node in manifest.nodes.items(): + target_model = None + target_model_name = None + target_model_package = None + + for ref in node.get('refs', []): + if len(ref) == 1: + target_model_name = ref[0] + elif len(ref) == 2: + target_model_package, target_model_name = ref + + target_model = cls.resolve_ref( + flat_graph, + target_model_name, + target_model_package, + current_project, + node.get('package_name')) + + if target_model is None: + # This may raise. Even if it doesn't, we don't want to add + # this node to the graph b/c there is no destination node + node.get('config', {})['enabled'] = False + dbt.utils.invalid_ref_fail_unless_test( + node, target_model_name, target_model_package) + + continue + + target_model_id = target_model.get('unique_id') + + node['depends_on']['nodes'].append(target_model_id) + flat_graph['nodes'][node['unique_id']] = node + + return manifest diff --git a/test/unit/test_parser.py b/test/unit/test_parser.py index 0f1a9090f76..5b9eadcb5aa 100644 --- a/test/unit/test_parser.py +++ b/test/unit/test_parser.py @@ -3,7 +3,7 @@ import os import dbt.flags -import dbt.parser +from dbt.parser import ModelParser, MacroParser, DataTestParser, SchemaParser, ParserUtils from dbt.node_types import NodeType from dbt.contracts.graph.parsed import ParsedManifest, ParsedNode, ParsedMacro @@ -87,7 +87,7 @@ def test__single_model(self): }] self.assertEquals( - dbt.parser.parse_sql_nodes( + ModelParser.parse_sql_nodes( models, self.root_project_config, {'root': self.root_project_config, @@ -145,7 +145,7 @@ def test__single_model__nested_configuration(self): }) self.assertEquals( - dbt.parser.parse_sql_nodes( + ModelParser.parse_sql_nodes( models, self.root_project_config, {'root': self.root_project_config, @@ -187,7 +187,7 @@ def test__empty_model(self): }] self.assertEquals( - dbt.parser.parse_sql_nodes( + ModelParser.parse_sql_nodes( models, self.root_project_config, {'root': self.root_project_config}), @@ -236,7 +236,7 @@ def test__simple_dependency(self): }] self.assertEquals( - dbt.parser.parse_sql_nodes( + ModelParser.parse_sql_nodes( models, self.root_project_config, {'root': self.root_project_config, @@ -335,7 +335,7 @@ def test__multiple_dependencies(self): }] self.assertEquals( - dbt.parser.parse_sql_nodes( + ModelParser.parse_sql_nodes( models, self.root_project_config, {'root': self.root_project_config, @@ -499,7 +499,7 @@ def test__multiple_dependencies__packages(self): }] self.assertEquals( - dbt.parser.parse_sql_nodes( + ModelParser.parse_sql_nodes( models, self.root_project_config, {'root': self.root_project_config, @@ -686,7 +686,7 @@ def test__process_refs__packages(self): macros={k: ParsedMacro(**v) for (k,v) in graph['macros'].items()}, ) - processed_manifest = dbt.parser.process_refs(manifest, 'root') + processed_manifest = ParserUtils.process_refs(manifest, 'root') self.assertEquals( processed_manifest.to_flat_graph(), { @@ -776,7 +776,7 @@ def test__in_model_config(self): }) self.assertEquals( - dbt.parser.parse_sql_nodes( + ModelParser.parse_sql_nodes( models, self.root_project_config, {'root': self.root_project_config, @@ -858,7 +858,7 @@ def test__root_project_config(self): }) self.assertEquals( - dbt.parser.parse_sql_nodes( + ModelParser.parse_sql_nodes( models, self.root_project_config, {'root': self.root_project_config, @@ -1049,7 +1049,7 @@ def test__other_project_config(self): }) self.assertEquals( - dbt.parser.parse_sql_nodes( + ModelParser.parse_sql_nodes( models, self.root_project_config, {'root': self.root_project_config, @@ -1164,7 +1164,7 @@ def test__simple_schema_test(self): relationships_sql = "{{ test_relationships(model=ref('model_one'), field='id', from='id', to=ref('model_two')) }}" # noqa self.assertEquals( - dbt.parser.parse_schema_tests( + SchemaParser.parse_schema_tests( tests, self.root_project_config, {'root': self.root_project_config, @@ -1281,7 +1281,7 @@ def test__schema_test_with_comments(self): }] self.assertEquals( - dbt.parser.parse_schema_tests( + SchemaParser.parse_schema_tests( tests, self.root_project_config, {'root': self.root_project_config, @@ -1300,7 +1300,7 @@ def test__empty_schema_test(self): }] self.assertEquals( - dbt.parser.parse_schema_tests( + SchemaParser.parse_schema_tests( tests, self.root_project_config, {'root': self.root_project_config, @@ -1319,7 +1319,7 @@ def test__simple_data_test(self): }] self.assertEquals( - dbt.parser.parse_sql_nodes( + DataTestParser.parse_sql_nodes( tests, self.root_project_config, {'root': self.root_project_config, @@ -1356,7 +1356,7 @@ def test__simple_macro(self): {% endmacro %} """ - result = dbt.parser.parse_macro_file( + result = MacroParser.parse_macro_file( macro_file_path='simple_macro.sql', macro_file_contents=macro_file_contents, root_path=get_os_path('/usr/src/app'), @@ -1392,7 +1392,7 @@ def test__simple_macro_used_in_model(self): {% endmacro %} """ - result = dbt.parser.parse_macro_file( + result = MacroParser.parse_macro_file( macro_file_path='simple_macro.sql', macro_file_contents=macro_file_contents, root_path=get_os_path('/usr/src/app'), @@ -1432,7 +1432,7 @@ def test__simple_macro_used_in_model(self): }] self.assertEquals( - dbt.parser.parse_sql_nodes( + ModelParser.parse_sql_nodes( models, self.root_project_config, {'root': self.root_project_config, @@ -1474,7 +1474,7 @@ def test__macro_no_explicit_project_used_in_model(self): }] self.assertEquals( - dbt.parser.parse_sql_nodes( + ModelParser.parse_sql_nodes( models, self.root_project_config, {'root': self.root_project_config,