Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task call sites part 1 #4183

Merged
merged 18 commits into from
Nov 2, 2021
Merged
408 changes: 396 additions & 12 deletions core/dbt/events/types.py

Large diffs are not rendered by default.

71 changes: 23 additions & 48 deletions core/dbt/task/base.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import os
import threading
import time
import traceback
from abc import ABCMeta, abstractmethod
from typing import Type, Union, Dict, Any, Optional

from dbt import tracking
from dbt import ui
from dbt import flags
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.results import (
Expand All @@ -16,7 +14,14 @@
NotImplementedException, CompilationException, RuntimeException,
InternalException
)
from dbt.logger import GLOBAL_LOGGER as logger, log_manager
from dbt.logger import log_manager
from dbt.events.functions import fire_event
from dbt.events.types import (
DbtProjectError, DbtProjectErrorException, DbtProfileError, DbtProfileErrorException,
ProfileListTitle, ListSingleProfile, NoDefinedProfiles, ProfileHelpMessage,
CatchableExceptionOnRun, InternalExceptionOnRun, GenericExceptionOnRun,
NodeConnectionReleaseError, PrintDebugStackTrace
)
from .printer import print_skip_caused_by_error, print_skip_line


Expand Down Expand Up @@ -47,13 +52,6 @@ def read_profiles(profiles_dir=None):
return profiles


PROFILES_HELP_MESSAGE = """
For more information on configuring profiles, please consult the dbt docs:

https://docs.getdbt.com/docs/configure-your-profile
"""


class BaseTask(metaclass=ABCMeta):
ConfigType: Union[Type[NoneConfig], Type[Project]] = NoneConfig

Expand Down Expand Up @@ -82,28 +80,27 @@ def from_args(cls, args):
try:
config = cls.ConfigType.from_args(args)
except dbt.exceptions.DbtProjectError as exc:
logger.error("Encountered an error while reading the project:")
logger.error(" ERROR: {}".format(str(exc)))
fire_event(DbtProjectError())
fire_event(DbtProjectErrorException(exc=exc))

tracking.track_invalid_invocation(
args=args,
result_type=exc.result_type)
raise dbt.exceptions.RuntimeException('Could not run dbt') from exc
except dbt.exceptions.DbtProfileError as exc:
logger.error("Encountered an error while reading profiles:")
logger.error(" ERROR {}".format(str(exc)))
fire_event(DbtProfileError())
fire_event(DbtProfileErrorException(exc=exc))

all_profiles = read_profiles(flags.PROFILES_DIR).keys()

if len(all_profiles) > 0:
logger.info("Defined profiles:")
fire_event(ProfileListTitle())
for profile in all_profiles:
logger.info(" - {}".format(profile))
fire_event(ListSingleProfile(profile=profile))
else:
logger.info("There are no profiles defined in your "
"profiles.yml file")
fire_event(NoDefinedProfiles())

logger.info(PROFILES_HELP_MESSAGE)
fire_event(ProfileHelpMessage())

tracking.track_invalid_invocation(
args=args,
Expand Down Expand Up @@ -165,11 +162,6 @@ def from_args(cls, args):
return super().from_args(args)


INTERNAL_ERROR_STRING = """This is an error in dbt. Please try again. If \
the error persists, open an issue at https://github.com/dbt-labs/dbt-core
""".strip()


class ExecutionContext:
"""During execution and error handling, dbt makes use of mutable state:
timing information and the newest (compiled vs executed) form of the node.
Expand Down Expand Up @@ -307,33 +299,19 @@ def _handle_catchable_exception(self, e, ctx):
if e.node is None:
e.add_node(ctx.node)

logger.debug(str(e), exc_info=True)
fire_event(CatchableExceptionOnRun(exc=e))
return str(e)

def _handle_internal_exception(self, e, ctx):
build_path = self.node.build_path
prefix = 'Internal error executing {}'.format(build_path)

error = "{prefix}\n{error}\n\n{note}".format(
prefix=ui.red(prefix),
error=str(e).strip(),
note=INTERNAL_ERROR_STRING
)
logger.debug(error, exc_info=True)
fire_event(InternalExceptionOnRun(build_path=self.node.build_path, exc=e))
return str(e)

def _handle_generic_exception(self, e, ctx):
node_description = self.node.build_path
if node_description is None:
node_description = self.node.unique_id
prefix = "Unhandled error while executing {}".format(node_description)
error = "{prefix}\n{error}".format(
prefix=ui.red(prefix),
error=str(e).strip()
)
fire_event(GenericExceptionOnRun(build_path=self.node.build_path,
unique_id=self.node.unique_id,
exc=e))
fire_event(PrintDebugStackTrace())

logger.error(error)
logger.debug('', exc_info=True)
return str(e)

def handle_exception(self, e, ctx):
Expand Down Expand Up @@ -383,10 +361,7 @@ def _safe_release_connection(self):
try:
self.adapter.release_connection()
except Exception as exc:
logger.debug(
'Error releasing connection for node {}: {!s}\n{}'
.format(self.node.name, exc, traceback.format_exc())
)
fire_event(NodeConnectionReleaseError(node_name=self.node.name, exc=exc))
return str(exc)

return None
Expand Down
15 changes: 9 additions & 6 deletions core/dbt/task/clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

from dbt import deprecations
from dbt.task.base import BaseTask, move_to_nearest_project_dir
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.events.functions import fire_event
from dbt.events.types import (
CheckCleanPath, ConfirmCleanPath, ProtectedCleanPath, FinishedCleanPaths
)
from dbt.config import UnsetProfileConfig


Expand Down Expand Up @@ -38,11 +41,11 @@ def run(self):
self.config.packages_install_path != 'dbt_modules'):
deprecations.warn('install-packages-path')
for path in self.config.clean_targets:
logger.info("Checking {}/*".format(path))
fire_event(CheckCleanPath(path=path))
if not self.__is_protected_path(path):
shutil.rmtree(path, True)
logger.info(" Cleaned {}/*".format(path))
fire_event(ConfirmCleanPath(path=path))
else:
logger.info("ERROR: not cleaning {}/* because it is "
"protected".format(path))
logger.info("Finished cleaning all paths.")
fire_event(ProtectedCleanPath(path=path))

fire_event(FinishedCleanPaths())
6 changes: 4 additions & 2 deletions core/dbt/task/compile.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import threading

from .runnable import GraphRunnableTask
from .base import BaseRunner

from dbt.contracts.results import RunStatus, RunResult
from dbt.exceptions import InternalException
from dbt.graph import ResourceTypeSelector
from dbt.logger import print_timestamped_line
from dbt.events.functions import fire_event
from dbt.events.types import CompileComplete
from dbt.node_types import NodeType


Expand Down Expand Up @@ -53,4 +55,4 @@ def get_runner_type(self, _):
return CompileRunner

def task_end_messages(self, results):
print_timestamped_line('Done.')
fire_event(CompileComplete())
11 changes: 3 additions & 8 deletions core/dbt/task/debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
import sys
from typing import Optional, Dict, Any, List

from dbt.logger import GLOBAL_LOGGER as logger
from dbt.events.functions import fire_event
from dbt.events.types import OpenCommand
from dbt import flags
import dbt.clients.system
import dbt.exceptions
Expand Down Expand Up @@ -102,13 +103,7 @@ def project_profile(self):

def path_info(self):
open_cmd = dbt.clients.system.open_dir_cmd()

message = PROFILE_DIR_MESSAGE.format(
open_cmd=open_cmd,
profiles_dir=self.profiles_dir
)

logger.info(message)
fire_event(OpenCommand(open_cmd=open_cmd, profiles_dir=self.profiles_dir))

def run(self):
if self.args.config_dir:
Expand Down
25 changes: 12 additions & 13 deletions core/dbt/task/deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
from dbt.deps.base import downloads_directory
from dbt.deps.resolver import resolve_packages

from dbt.logger import GLOBAL_LOGGER as logger
from dbt.events.functions import fire_event
from dbt.events.types import (
DepsNoPackagesFound, DepsStartPackageInstall, DepsUpdateAvailable, DepsUTD,
DepsInstallInfo, DepsListSubdirectory, DepsNotifyUpdatesAvailable
)
from dbt.clients import system

from dbt.task.base import BaseTask, move_to_nearest_project_dir
Expand Down Expand Up @@ -46,7 +50,7 @@ def run(self):
system.make_directory(self.config.packages_install_path)
packages = self.config.packages.packages
if not packages:
logger.info('Warning: No packages were found in packages.yml')
fire_event(DepsNoPackagesFound())
return

with downloads_directory():
Expand All @@ -62,30 +66,25 @@ def run(self):
source_type = package.source_type()
version = package.get_version()

logger.info('Installing {}', package)
fire_event(DepsStartPackageInstall(package=package))
package.install(self.config, renderer)
logger.info(' Installed from {}',
package.nice_version_name())
fire_event(DepsInstallInfo(version_name=package.nice_version_name()))
if source_type == 'hub':
version_latest = package.get_version_latest()
if version_latest != version:
packages_to_upgrade.append(package_name)
logger.info(' Updated version available: {}',
version_latest)
fire_event(DepsUpdateAvailable(version_latest=version_latest))
else:
logger.info(' Up to date!')
fire_event(DepsUTD())
if package.get_subdirectory():
logger.info(' and subdirectory {}',
package.get_subdirectory())
fire_event(DepsListSubdirectory(subdirectory=package.get_subdirectory()))

self.track_package_install(
package_name=package_name,
source_type=source_type,
version=version)
if packages_to_upgrade:
logger.info('\nUpdates available for packages: {} \
\nUpdate your versions in packages.yml, then run dbt deps',
packages_to_upgrade)
fire_event(DepsNotifyUpdatesAvailable(packages=packages_to_upgrade))

@classmethod
def from_args(cls, args):
Expand Down
5 changes: 3 additions & 2 deletions core/dbt/task/freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
SourceFreshnessResult, FreshnessStatus
)
from dbt.exceptions import RuntimeException, InternalException
from dbt.logger import print_timestamped_line
from dbt.events.functions import fire_event
from dbt.events.types import FreshnessCheckComplete
from dbt.node_types import NodeType

from dbt.graph import ResourceTypeSelector
Expand Down Expand Up @@ -170,4 +171,4 @@ def task_end_messages(self, results):
):
print_run_result_error(result)

print_timestamped_line('Done.')
fire_event(FreshnessCheckComplete())
22 changes: 8 additions & 14 deletions core/dbt/task/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
)
from dbt.exceptions import InternalException
from dbt.include.global_project import DOCS_INDEX_FILE_PATH
from dbt.logger import GLOBAL_LOGGER as logger, print_timestamped_line
from dbt.events.functions import fire_event
from dbt.events.types import (
WriteCatalogFailure, CatalogWritten, CannotGenerateDocs, BuildingCatalog
)
from dbt.parser.manifest import ManifestLoader
import dbt.utils
import dbt.compilation
Expand Down Expand Up @@ -203,9 +206,7 @@ def run(self) -> CatalogArtifact:
if self.args.compile:
compile_results = CompileTask.run(self)
if any(r.status == NodeStatus.Error for r in compile_results):
print_timestamped_line(
'compile failed, cannot generate docs'
)
fire_event(CannotGenerateDocs())
return CatalogArtifact.from_results(
nodes={},
sources={},
Expand Down Expand Up @@ -238,7 +239,7 @@ def run(self) -> CatalogArtifact:

adapter = get_adapter(self.config)
with adapter.connection_named('generate_catalog'):
print_timestamped_line("Building catalog")
fire_event(BuildingCatalog())
catalog_table, exceptions = adapter.get_catalog(self.manifest)

catalog_data: List[PrimitiveDict] = [
Expand Down Expand Up @@ -267,15 +268,8 @@ def run(self) -> CatalogArtifact:
self.write_manifest()

if exceptions:
logger.error(
'dbt encountered {} failure{} while writing the catalog'
.format(len(exceptions), (len(exceptions) != 1) * 's')
)

print_timestamped_line(
'Catalog written to {}'.format(os.path.abspath(path))
)

fire_event(WriteCatalogFailure(num_exceptions=len(exceptions)))
fire_event(CatalogWritten(path=os.path.abspath(path)))
return results

def get_catalog_results(
Expand Down
Loading