From 6649840b46f9f7d165346fbdac0ed1ac1a5f03d4 Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Thu, 22 Aug 2019 16:40:30 -0600 Subject: [PATCH 1/3] make mypy happy with deprecations Requires git-only hologram branch TODO: update hologram version + do another release --- core/dbt/contracts/connection.py | 8 +-- core/dbt/contracts/graph/parsed.py | 9 ++- core/dbt/contracts/project.py | 12 ++-- core/dbt/deprecations.py | 55 ++++++++++++++----- core/setup.py | 2 +- .../snowplow_tracker/__init__.pyi | 12 ---- tox.ini | 1 + 7 files changed, 59 insertions(+), 40 deletions(-) diff --git a/core/dbt/contracts/connection.py b/core/dbt/contracts/connection.py index b1ca4ddbc49..d99dc260037 100644 --- a/core/dbt/contracts/connection.py +++ b/core/dbt/contracts/connection.py @@ -1,12 +1,12 @@ -from hologram.helpers import StrEnum, NewPatternType, ExtensibleJsonSchemaMixin +from hologram.helpers import StrEnum, register_pattern, ExtensibleJsonSchemaMixin from hologram import JsonSchemaMixin from dbt.contracts.util import Replaceable from dataclasses import dataclass -from typing import Any, Optional +from typing import Any, Optional, NewType - -Identifier = NewPatternType('Identifier', r'^[A-Za-z_][A-Za-z0-9_]+$') +Identifier = NewType('Identifier', str) +register_pattern(Identifier, r'^[A-Za-z_][A-Za-z0-9_]+$') class ConnectionState(StrEnum): diff --git a/core/dbt/contracts/graph/parsed.py b/core/dbt/contracts/graph/parsed.py index 50a7cb123b8..a656bd626f0 100644 --- a/core/dbt/contracts/graph/parsed.py +++ b/core/dbt/contracts/graph/parsed.py @@ -1,8 +1,10 @@ from dataclasses import dataclass, field -from typing import Optional, Union, List, Dict, Any, Type, Tuple +from typing import Optional, Union, List, Dict, Any, Type, Tuple, NewType from hologram import JsonSchemaMixin -from hologram.helpers import StrEnum, NewPatternType, ExtensibleJsonSchemaMixin +from hologram.helpers import ( + StrEnum, register_pattern, ExtensibleJsonSchemaMixin +) import dbt.clients.jinja import dbt.flags @@ -46,7 +48,8 @@ def insensitive_patterns(*patterns: str): return '^({})$'.format('|'.join(lowercased)) -Severity = NewPatternType('Severity', insensitive_patterns('warn', 'error')) +Severity = NewType('Severity', str) +register_pattern(Severity, insensitive_patterns('warn', 'error')) @dataclass diff --git a/core/dbt/contracts/project.py b/core/dbt/contracts/project.py index e8ed83d53b1..861d66a3d59 100644 --- a/core/dbt/contracts/project.py +++ b/core/dbt/contracts/project.py @@ -5,24 +5,26 @@ # from dbt.utils import JSONEncoder from hologram import JsonSchemaMixin -from hologram.helpers import HyphenatedJsonSchemaMixin, NewPatternType, \ +from hologram.helpers import HyphenatedJsonSchemaMixin, register_pattern, \ ExtensibleJsonSchemaMixin from dataclasses import dataclass, field -from typing import Optional, List, Dict, Union, Any +from typing import Optional, List, Dict, Union, Any, NewType PIN_PACKAGE_URL = 'https://docs.getdbt.com/docs/package-management#section-specifying-package-versions' # noqa DEFAULT_SEND_ANONYMOUS_USAGE_STATS = True DEFAULT_USE_COLORS = True -Name = NewPatternType('Name', r'^[^\d\W]\w*\Z') +Name = NewType('Name', str) +register_pattern(Name, r'^[^\d\W]\w*\Z') # this does not support the full semver (does not allow a trailing -fooXYZ) and # is not restrictive enough for full semver, (allows '1.0'). But it's like # 'semver lite'. -SemverString = NewPatternType( - 'SemverString', +SemverString = NewType('SemverString', str) +register_pattern( + SemverString, r'^(?:0|[1-9]\d*)\.(?:0|[1-9]\d*)(\.(?:0|[1-9]\d*))?$', ) diff --git a/core/dbt/deprecations.py b/core/dbt/deprecations.py index ff07f360a55..790b670b181 100644 --- a/core/dbt/deprecations.py +++ b/core/dbt/deprecations.py @@ -1,12 +1,31 @@ +from typing import Optional, Set, List, Dict, ClassVar + import dbt.links +import dbt.exceptions import dbt.flags class DBTDeprecation: - name = None - description = None + _name: ClassVar[Optional[str]] = None + _description: ClassVar[Optional[str]] = None + + @property + def name(self) -> str: + if self._name is not None: + return self._name + raise NotImplementedError( + 'name not implemented for {}'.format(self) + ) + + @property + def description(self) -> str: + if self._description is not None: + return self._description + raise NotImplementedError( + 'description not implemented for {}'.format(self) + ) - def show(self, *args, **kwargs): + def show(self, *args, **kwargs) -> None: if self.name not in active_deprecations: desc = self.description.format(**kwargs) dbt.exceptions.warn_or_error( @@ -16,8 +35,10 @@ def show(self, *args, **kwargs): class DBTRepositoriesDeprecation(DBTDeprecation): - name = "repositories" - description = """The dbt_project.yml configuration option 'repositories' is + _name = "repositories" + + _description = """ + The dbt_project.yml configuration option 'repositories' is deprecated. Please place dependencies in the `packages.yml` file instead. The 'repositories' option will be removed in a future version of dbt. @@ -26,12 +47,13 @@ class DBTRepositoriesDeprecation(DBTDeprecation): # Example packages.yml contents: {recommendation} - """ + """.lstrip() class GenerateSchemaNameSingleArgDeprecated(DBTDeprecation): - name = 'generate-schema-name-single-arg' - description = '''As of dbt v0.14.0, the `generate_schema_name` macro + _name = 'generate-schema-name-single-arg' + + _description = '''As of dbt v0.14.0, the `generate_schema_name` macro accepts a second "node" argument. The one-argument form of `generate_schema_name` is deprecated, and will become unsupported in a future release. @@ -47,11 +69,12 @@ class GenerateSchemaNameSingleArgDeprecated(DBTDeprecation): https://docs.getdbt.com/docs/adapter""" -def renamed_method(old_name, new_name): +def renamed_method(old_name: str, new_name: str): + class AdapterDeprecationWarning(DBTDeprecation): - name = 'adapter:{}'.format(old_name) - description = _adapter_renamed_description.format(old_name=old_name, - new_name=new_name) + _name = 'adapter:{}'.format(old_name) + _description = _adapter_renamed_description.format(old_name=old_name, + new_name=new_name) dep = AdapterDeprecationWarning() deprecations_list.append(dep) @@ -71,14 +94,16 @@ def warn(name, *args, **kwargs): # these are globally available # since modules are only imported once, active_deprecations is a singleton -active_deprecations = set() +active_deprecations: Set[str] = set() -deprecations_list = [ +deprecations_list: List[DBTDeprecation] = [ DBTRepositoriesDeprecation(), GenerateSchemaNameSingleArgDeprecated(), ] -deprecations = {d.name: d for d in deprecations_list} +deprecations: Dict[str, DBTDeprecation] = { + d.name: d for d in deprecations_list +} def reset_deprecations(): diff --git a/core/setup.py b/core/setup.py index f4dec25f917..9bee88448b4 100644 --- a/core/setup.py +++ b/core/setup.py @@ -56,6 +56,6 @@ def read(fname): 'json-rpc>=1.12,<2', 'werkzeug>=0.14.1,<0.15', 'dataclasses;python_version<"3.7"', - 'hologram==0.0.1', + 'hologram==0.0.2', ] ) diff --git a/third-party-stubs/snowplow_tracker/__init__.pyi b/third-party-stubs/snowplow_tracker/__init__.pyi index 6c88a057f94..5dff3636134 100644 --- a/third-party-stubs/snowplow_tracker/__init__.pyi +++ b/third-party-stubs/snowplow_tracker/__init__.pyi @@ -31,18 +31,6 @@ class Tracker: encode_base64: bool = ... def __init__(self, emitters: Union[List[Any], Any], subject: Optional[Subject] = ..., namespace: Optional[str] = ..., app_id: Optional[str] = ..., encode_base64: bool = ...) -> None: ... - # @staticmethod - # def get_uuid(): ... - # @staticmethod - # def get_timestamp(tstamp: Optional[Any] = ...): ... - # def track(self, pb: Any): ... - # def complete_payload(self, pb: Any, context: Any, tstamp: Any): ... - # def track_struct_event(self, category: Any, action: Any, label: Optional[Any] = ..., property_: Optional[Any] = ..., value: Optional[Any] = ..., context: Optional[Any] = ..., tstamp: Optional[Any] = ...): ... - # def track_unstruct_event(self, event_json: Any, context: Optional[Any] = ..., tstamp: Optional[Any] = ...): ... - # track_self_describing_event: Any = ... - # def flush(self, asynchronous: bool = ...): ... - # def set_subject(self, subject: Any): ... - # def add_emitter(self, emitter: Any): ... class SelfDescribingJson: diff --git a/tox.ini b/tox.ini index 05e0881cad7..6a0b98591c7 100644 --- a/tox.ini +++ b/tox.ini @@ -14,6 +14,7 @@ basepython = python3.6 commands = /bin/bash -c '$(which mypy) --namespace-packages \ core/dbt/clients \ core/dbt/config \ + core/dbt/deprecations.py \ core/dbt/exceptions.py \ core/dbt/flags.py \ core/dbt/helper_types.py \ From 9581f391867027461c49ac189ae0055b9a83b152 Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Fri, 23 Aug 2019 09:11:38 -0600 Subject: [PATCH 2/3] Trigger rpc server reloads with sighup From the sighup until completion, the sever enters an error state where only builtin commands work - management of tasks should keep functioning properly Add new builtin command 'status' - returns 'pid', 'status' enum, 'timestamp' - if the status is 'error', an 'error' field is populated - just a Dict[str, Any] with, only 'message' for now --- core/dbt/contracts/connection.py | 4 +- core/dbt/deprecations.py | 2 +- core/dbt/rpc.py | 85 +++++++++++-- core/dbt/task/rpc_server.py | 114 ++++++++++++++++-- .../010_permission_tests/test_permissions.py | 18 ++- .../042_sources_test/test_sources.py | 104 ++++++++++++++-- 6 files changed, 289 insertions(+), 38 deletions(-) diff --git a/core/dbt/contracts/connection.py b/core/dbt/contracts/connection.py index d99dc260037..2ed88ae38c1 100644 --- a/core/dbt/contracts/connection.py +++ b/core/dbt/contracts/connection.py @@ -1,4 +1,6 @@ -from hologram.helpers import StrEnum, register_pattern, ExtensibleJsonSchemaMixin +from hologram.helpers import ( + StrEnum, register_pattern, ExtensibleJsonSchemaMixin +) from hologram import JsonSchemaMixin from dbt.contracts.util import Replaceable diff --git a/core/dbt/deprecations.py b/core/dbt/deprecations.py index 790b670b181..e4daa4ca2bd 100644 --- a/core/dbt/deprecations.py +++ b/core/dbt/deprecations.py @@ -102,7 +102,7 @@ def warn(name, *args, **kwargs): ] deprecations: Dict[str, DBTDeprecation] = { - d.name: d for d in deprecations_list + d.name: d for d in deprecations_list } diff --git a/core/dbt/rpc.py b/core/dbt/rpc.py index 0a960c234b5..6e8f8bf1e0c 100644 --- a/core/dbt/rpc.py +++ b/core/dbt/rpc.py @@ -1,3 +1,5 @@ +from hologram import JsonSchemaMixin +from hologram.helpers import StrEnum from jsonrpc.exceptions import \ JSONRPCDispatchException, \ JSONRPCInvalidParams, \ @@ -8,7 +10,10 @@ from jsonrpc.jsonrpc import JSONRPCRequest from jsonrpc.jsonrpc2 import JSONRPC20Response +from dataclasses import dataclass, field +from datetime import datetime from enum import Enum +from typing import Any, Dict, Optional import json import multiprocessing import os @@ -139,6 +144,8 @@ def __getitem__(self, key): return func task = self.manager.rpc_task(key) + if task is None: + raise KeyError(key) return self.rpc_factory(task) @@ -319,22 +326,76 @@ def state(self): ) +class ManifestStatus(StrEnum): + Init = 'init' + Compiling = 'compiling' + Ready = 'ready' + Error = 'error' + + +@dataclass +class LastCompile(JsonSchemaMixin): + status: ManifestStatus + error: Optional[Dict[str, Any]] = None + timestamp: datetime = field(default_factory=datetime.utcnow) + + class TaskManager: - def __init__(self): + def __init__(self, args, config): + self.args = args + self.config = config self.tasks = {} self.completed = {} self._rpc_task_map = {} - self._rpc_function_map = {} + self._last_compile = LastCompile(status=ManifestStatus.Compiling) self._lock = multiprocessing.Lock() def add_request(self, request_handler): self.tasks[request_handler.task_id] = request_handler - def add_task_handler(self, task): - self._rpc_task_map[task.METHOD_NAME] = task + def add_task_handler(self, task, manifest): + self._rpc_task_map[task.METHOD_NAME] = task( + self.args, self.config, manifest + ) def rpc_task(self, method_name): - return self._rpc_task_map[method_name] + with self._lock: + if self._last_compile.status == ManifestStatus.Ready: + return self._rpc_task_map[method_name] + else: + return None + + def ready(self): + with self._lock: + return self._last_compile.status == ManifestStatus.Ready + + def set_compiling(self): + assert self._last_compile.status != ManifestStatus.Compiling, \ + f'invalid state {self._last_compile.status}' + with self._lock: + self._last_compile = LastCompile(status=ManifestStatus.Compiling) + self._rpc_task_map.clear() + + def set_compile_exception(self, exc): + assert self._last_compile.status == ManifestStatus.Compiling, \ + f'invalid state {self._last_compile.status}' + self._last_compile = LastCompile( + error={'message': str(exc)}, + status=ManifestStatus.Error + ) + + def set_ready(self): + assert self._last_compile.status == ManifestStatus.Compiling, \ + f'invalid state {self._last_compile.status}' + self._last_compile = LastCompile(status=ManifestStatus.Ready) + + def process_status(self): + with self._lock: + last_compile = self._last_compile + + status = last_compile.to_dict() + status['pid'] = os.getpid() + return status def process_listing(self, active=True, completed=False): included_tasks = {} @@ -400,6 +461,8 @@ def rpc_builtin(self, method_name): return self.process_listing if method_name == 'kill' and os.name != 'nt': return self.process_kill + if method_name == 'status': + return self.process_status return None def mark_done(self, request_handler): @@ -411,10 +474,17 @@ def mark_done(self, request_handler): self.completed[task_id] = self.tasks.pop(task_id) def methods(self): - rpc_builtin_methods = ['ps'] + rpc_builtin_methods = ['ps', 'status'] if os.name != 'nt': rpc_builtin_methods.append('kill') - return list(self._rpc_task_map) + rpc_builtin_methods + + with self._lock: + if not self._last_compile == ManifestStatus.Ready: + task_map = [] + else: + task_map = list(self._rpc_task_map) + + return task_map + rpc_builtin_methods class ResponseManager(JSONRPCResponseManager): @@ -440,6 +510,7 @@ def handle(cls, http_request, task_manager): return JSONRPC20Response(error=JSONRPCInvalidRequest()._data) track_rpc_request(request.method) + dispatcher = RequestDispatcher( http_request, request, diff --git a/core/dbt/task/rpc_server.py b/core/dbt/task/rpc_server.py index 506f798f020..a11968555aa 100644 --- a/core/dbt/task/rpc_server.py +++ b/core/dbt/task/rpc_server.py @@ -1,4 +1,8 @@ import json +import os +import signal +import threading +from contextlib import contextmanager from werkzeug.wsgi import DispatcherMiddleware from werkzeug.wrappers import Request, Response @@ -17,25 +21,108 @@ from dbt import rpc +# SIG_DFL ends up killing the process if multiple build up, but SIG_IGN just +# peacefully carries on +SIG_IGN = signal.SIG_IGN + + +def reload_manager(task_manager, tasks): + try: + compile_task = CompileTask(task_manager.args, task_manager.config) + compile_task.run() + manifest = compile_task.manifest + + for cls in tasks: + task_manager.add_task_handler(cls, manifest) + except Exception as exc: + task_manager.set_compile_exception(exc) + else: + task_manager.set_ready() + + +@contextmanager +def signhup_replace(): + """Å context manager. Replace the current sighup handler with SIG_IGN on + entering, and (if the current handler was not SIG_IGN) replace it on + leaving. This is meant to be used inside a sighup handler itself to + provide. a sort of locking model. + + This relies on the fact that 1) signals are only handled by the main thread + (the default in Python) and 2) signal.signal() is "atomic" (only C + instructions). I'm pretty sure that's reliable on posix. + + This shouldn't replace if the handler wasn't already SIG_IGN, and should + yield whether it has the lock as its value. Callers shouldn't do + singal-handling things inside this context manager if it does not have the + lock (they should just exit the context). + """ + # Don't use locks here! This is called from inside a signal handler + + # set our handler to ignore signals, capturing the existing one + current_handler = signal.signal(signal.SIGHUP, SIG_IGN) + + # current_handler should be the handler unless we're already loading a + # new manifest. So if the current handler is the ignore, there was a + # double-hup! We should exit and not touch the signal handler, to make + # sure we let the other signal handler fix it + is_current_handler = current_handler is not SIG_IGN + + # if we got here, we're the ones in charge of configuring! Yield. + try: + yield is_current_handler + finally: + if is_current_handler: + # the signal handler that successfully changed the handler is + # responsible for resetting, and can't be re-called until it's + # fixed, so no locking needed + + signal.signal(signal.SIGHUP, current_handler) + + class RPCServerTask(ConfiguredTask): def __init__(self, args, config, tasks=None): super().__init__(args, config) - # compile locally - self.manifest = self._compile_manifest() - self.task_manager = rpc.TaskManager() - tasks = tasks or [ + self._tasks = tasks or self._default_tasks() + self.task_manager = rpc.TaskManager(self.args, self.config) + self._reloader = None + reload_manager(self.task_manager, self._tasks) + + # windows doesn't have SIGHUP so don't do sighup things + if os.name != 'nt': + signal.signal(signal.SIGHUP, self._sighup_handler) + + def _reload_task_manager(self): + """This function can only be running once at a time, as it runs in the + signal handler we replace + """ + # mark the task manager invalid for task running + self.task_manager.set_compiling() + # compile in a thread that will fix up the tag manager when it's done + reloader = threading.Thread( + target=reload_manager, + args=(self.task_manager, self._tasks), + ) + reloader.start() + # only assign to _reloader here, to avoid calling join() before start() + self._reloader = reloader + + def _sighup_handler(self, signum, frame): + with signhup_replace() as run_task_manger: + if not run_task_manger: + # a sighup handler is already active. + return + if self._reloader is not None and self._reloader.is_alive(): + # a reloader is already active. + return + self._reload_task_manager() + + @staticmethod + def _default_tasks(): + return [ RemoteCompileTask, RemoteCompileProjectTask, RemoteRunTask, RemoteRunProjectTask, RemoteSeedProjectTask, RemoteTestProjectTask ] - for cls in tasks: - task = cls(args, config, self.manifest) - self.task_manager.add_task_handler(task) - - def _compile_manifest(self): - compile_task = CompileTask(self.args, self.config) - compile_task.run() - return compile_task.manifest def run(self): host = self.args.host @@ -47,7 +134,7 @@ def run(self): display_host = 'localhost' logger.info( - 'Serving RPC server at {}:{}'.format(*addr) + 'Serving RPC server at {}:{}, pid={}'.format(*addr, os.getpid()) ) logger.info( @@ -84,6 +171,7 @@ def handle_jsonrpc_request(self, request): logger.info('sending response ({}) to {}, data={}'.format( response, request.remote_addr, json.loads(json_data)) ) + logger.info('thread name: {}'.format(threading.current_thread().name)) return response @Request.application diff --git a/test/integration/010_permission_tests/test_permissions.py b/test/integration/010_permission_tests/test_permissions.py index 4179d48fbee..6409e16dd18 100644 --- a/test/integration/010_permission_tests/test_permissions.py +++ b/test/integration/010_permission_tests/test_permissions.py @@ -1,5 +1,16 @@ from test.integration.base import DBTIntegrationTest, use_profile +from pytest import mark + +# postgres sometimes fails with an internal error if you run these tests too close together. +def postgres_error(err, *args): + msg = str(err) + if 'tuple concurrently updated' in msg: + return True + return False + + +@mark.flaky(rerun_filter=postgres_error) class TestPermissions(DBTIntegrationTest): def setUp(self): @@ -17,14 +28,9 @@ def models(self): @use_profile('postgres') def test_no_create_schema_permissions(self): # the noaccess user does not have permissions to create a schema -- this should fail - failed = False self.run_sql('drop schema if exists "{}" cascade'.format(self.unique_schema())) - try: + with self.assertRaises(RuntimeError): self.run_dbt(['run', '--target', 'noaccess'], expect_pass=False) - except RuntimeError: - failed = True - - self.assertTrue(failed) @use_profile('postgres') def test_create_schema_permissions(self): diff --git a/test/integration/042_sources_test/test_sources.py b/test/integration/042_sources_test/test_sources.py index f7c72f09cc4..eee15ee335d 100644 --- a/test/integration/042_sources_test/test_sources.py +++ b/test/integration/042_sources_test/test_sources.py @@ -2,8 +2,8 @@ import multiprocessing import os import random +import signal import socket -import sys import time from base64 import standard_b64encode as b64 from datetime import datetime, timedelta @@ -419,8 +419,9 @@ def project_config(self): 'macro-paths': ['macros'], } - def build_query(self, method, kwargs, sql=None, test_request_id=1, - macros=None): + def build_query( + self, method, kwargs, sql=None, test_request_id=1, macros=None + ): body_data = '' if sql is not None: body_data += sql @@ -454,8 +455,9 @@ def handle_result(self, bg_query, pipe): else: return result - def background_query(self, _method, _sql=None, _test_request_id=1, - _block=False, macros=None, **kwargs): + def background_query( + self, _method, _sql=None, _test_request_id=1, _block=False, macros=None, **kwargs + ): built = self.build_query(_method, kwargs, _sql, _test_request_id, macros) @@ -480,8 +482,8 @@ def assertResultHasTimings(self, result, *names): datetime.strptime(timing['started_at'], '%Y-%m-%dT%H:%M:%S.%fZ') datetime.strptime(timing['completed_at'], '%Y-%m-%dT%H:%M:%S.%fZ') - def assertIsResult(self, data): - self.assertEqual(data['id'], 1) + def assertIsResult(self, data, id_=1): + self.assertEqual(data['id'], id_) self.assertEqual(data['jsonrpc'], '2.0') self.assertIn('result', data) self.assertNotIn('error', data) @@ -807,11 +809,10 @@ def kill_and_assert(self, pg_sleeper, task_id, request_id): self.assertIn('logs', error_data) return error_data - def _get_sleep_query(self): - request_id = 90890 + def _get_sleep_query(self, request_id=90890, duration=15): pg_sleeper = self.background_query( 'run', - 'select pg_sleep(15)', + 'select pg_sleep({})'.format(duration), _test_request_id=request_id, name='sleeper', ) @@ -976,3 +977,86 @@ def test_test_project_postgres(self): for result in results: self.assertEqual(result['status'], 0.0) self.assertNotIn('fail', result) + + def _wait_for_running(self, timeout=15, raise_on_timeout=True): + started = time.time() + time.sleep(0.5) + elapsed = time.time() - started + + while elapsed < timeout: + status = self.assertIsResult(self.query('status').json()) + if status['status'] == 'running': + return status + time.sleep(0.5) + elapsed = time.time() - started + + status = self.assertIsResult(self.query('status').json()) + if raise_on_timeout: + self.assertEqual( + status['status'], + 'ready', + f'exceeded max time of {timeout}: {elapsed} seconds elapsed' + ) + return status + + def assertRunning(self, sleepers): + sleeper_ps_result = self.query('ps', completed=False, active=True).json() + result = self.assertIsResult(sleeper_ps_result) + self.assertEqual(len(result['rows']), len(sleepers)) + result_map = {rd['request_id']: rd for rd in result['rows']} + for _, _, request_id in sleepers: + found = result_map[request_id] + self.assertEqual(found['request_id'], request_id) + self.assertEqual(found['method'], 'run') + self.assertEqual(found['state'], 'running') + self.assertEqual(found['timeout'], None) + + def _add_command(self, cmd, id_): + self.assertIsResult(self.query(cmd, _test_request_id=id_).json(), id_=id_) + + @mark.skipif(os.name == 'nt', reason='"sighup" not supported on windows') + @mark.flaky(rerun_filter=lambda *a, **kw: True) + @use_profile('postgres') + def test_sighup_postgres(self): + status = self.assertIsResult(self.query('status').json()) + self.assertEqual(status['status'], 'ready') + started_at = status['timestamp'] + + done_query = self.query('compile', 'select 1 as id', name='done').json() + self.assertIsResult(done_query) + sleepers = [] + command_ids = [] + + sleepers.append(self._get_sleep_query(1000, duration=60)) + self.assertRunning(sleepers) + + self._add_command('seed_project', 20) + command_ids.append(20) + self._add_command('run_project', 21) + command_ids.append(21) + + # sighup a few times + for _ in range(10): + os.kill(status['pid'], signal.SIGHUP) + + status = self._wait_for_running() + + # we should still still see our service: + self.assertRunning(sleepers) + + self._add_command('seed_project', 30) + command_ids.append(30) + self._add_command('run_project', 31) + command_ids.append(31) + + # start a new one too + sleepers.append(self._get_sleep_query(1001, duration=60)) + + # now we should see both + self.assertRunning(sleepers) + + # now pluck out the oldest one and kill it + dead, alive = sleepers + self.kill_and_assert(*dead) + self.assertRunning([alive]) + self.kill_and_assert(*alive) From bd63aac8b351cf47fa3372fb0f9388dce00dbfad Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Mon, 26 Aug 2019 14:44:16 -0600 Subject: [PATCH 3/3] PR feedback --- core/dbt/exceptions.py | 27 + core/dbt/rpc.py | 29 +- core/dbt/task/rpc_server.py | 6 +- .../042_sources_test/test_sources.py | 752 ----------------- .../data/expected_multi_source.csv | 4 + .../048_rpc_test/data/other_source_table.csv | 4 + .../048_rpc_test/data/other_table.csv | 4 + test/integration/048_rpc_test/data/source.csv | 101 +++ .../048_rpc_test/error_models/model.sql | 1 + .../048_rpc_test/error_models/schema.yml | 12 + .../integration/048_rpc_test/macros/macro.sql | 14 + .../malformed_models/descendant_model.sql | 1 + .../048_rpc_test/malformed_models/schema.yml | 14 + .../048_rpc_test/models/descendant_model.sql | 1 + .../048_rpc_test/models/ephemeral_model.sql | 3 + .../models/multi_source_model.sql | 2 + .../models/nonsource_descendant.sql | 1 + .../048_rpc_test/models/schema.yml | 63 ++ test/integration/048_rpc_test/seed.sql | 113 +++ test/integration/048_rpc_test/test_rpc.py | 787 ++++++++++++++++++ 20 files changed, 1175 insertions(+), 764 deletions(-) create mode 100644 test/integration/048_rpc_test/data/expected_multi_source.csv create mode 100644 test/integration/048_rpc_test/data/other_source_table.csv create mode 100644 test/integration/048_rpc_test/data/other_table.csv create mode 100644 test/integration/048_rpc_test/data/source.csv create mode 100644 test/integration/048_rpc_test/error_models/model.sql create mode 100644 test/integration/048_rpc_test/error_models/schema.yml create mode 100644 test/integration/048_rpc_test/macros/macro.sql create mode 100644 test/integration/048_rpc_test/malformed_models/descendant_model.sql create mode 100644 test/integration/048_rpc_test/malformed_models/schema.yml create mode 100644 test/integration/048_rpc_test/models/descendant_model.sql create mode 100644 test/integration/048_rpc_test/models/ephemeral_model.sql create mode 100644 test/integration/048_rpc_test/models/multi_source_model.sql create mode 100644 test/integration/048_rpc_test/models/nonsource_descendant.sql create mode 100644 test/integration/048_rpc_test/models/schema.yml create mode 100644 test/integration/048_rpc_test/seed.sql create mode 100644 test/integration/048_rpc_test/test_rpc.py diff --git a/core/dbt/exceptions.py b/core/dbt/exceptions.py index d8521292166..33759f3ab3b 100644 --- a/core/dbt/exceptions.py +++ b/core/dbt/exceptions.py @@ -162,6 +162,33 @@ def data(self): } +class RPCCompiling(RuntimeException): + CODE = 10010 + MESSAGE = ( + 'RPC server is compiling the project, call the "status" method for' + ' compile status' + ) + + +class RPCLoadException(RuntimeException): + CODE = 10011 + MESSAGE = ( + 'RPC server failed to compile project, call the "status" method for' + ' compile status' + ) + + def __init__(self, cause): + self.cause = cause + self.message = '{}: {}'.format(self.MESSAGE, self.cause['message']) + super().__init__(self.message) + + def data(self): + return { + 'cause': self.cause, + 'message': self.message + } + + class DatabaseException(RuntimeException): CODE = 10003 MESSAGE = "Database Error" diff --git a/core/dbt/rpc.py b/core/dbt/rpc.py index 6e8f8bf1e0c..24171feba2c 100644 --- a/core/dbt/rpc.py +++ b/core/dbt/rpc.py @@ -347,12 +347,15 @@ def __init__(self, args, config): self.tasks = {} self.completed = {} self._rpc_task_map = {} - self._last_compile = LastCompile(status=ManifestStatus.Compiling) + self._last_compile = LastCompile(status=ManifestStatus.Init) self._lock = multiprocessing.Lock() def add_request(self, request_handler): self.tasks[request_handler.task_id] = request_handler + def reserve_handler(self, task): + self._rpc_task_map[task.METHOD_NAME] = None + def add_task_handler(self, task, manifest): self._rpc_task_map[task.METHOD_NAME] = task( self.args, self.config, manifest @@ -360,10 +363,7 @@ def add_task_handler(self, task, manifest): def rpc_task(self, method_name): with self._lock: - if self._last_compile.status == ManifestStatus.Ready: - return self._rpc_task_map[method_name] - else: - return None + return self._rpc_task_map[method_name] def ready(self): with self._lock: @@ -374,7 +374,6 @@ def set_compiling(self): f'invalid state {self._last_compile.status}' with self._lock: self._last_compile = LastCompile(status=ManifestStatus.Compiling) - self._rpc_task_map.clear() def set_compile_exception(self, exc): assert self._last_compile.status == ManifestStatus.Compiling, \ @@ -456,6 +455,14 @@ def process_kill(self, task_id): result['finished'] = True return result + def process_currently_compiling(self, *args, **kwargs): + raise dbt_error(dbt.exceptions.RPCCompiling('compile in progress')) + + def process_compilation_error(self, *args, **kwargs): + raise dbt_error( + dbt.exceptions.RPCLoadException(self._last_compile.error) + ) + def rpc_builtin(self, method_name): if method_name == 'ps': return self.process_listing @@ -463,6 +470,11 @@ def rpc_builtin(self, method_name): return self.process_kill if method_name == 'status': return self.process_status + if method_name in self._rpc_task_map: + if self._last_compile.status == ManifestStatus.Compiling: + return self.process_currently_compiling + if self._last_compile.status == ManifestStatus.Error: + return self.process_compilation_error return None def mark_done(self, request_handler): @@ -479,10 +491,7 @@ def methods(self): rpc_builtin_methods.append('kill') with self._lock: - if not self._last_compile == ManifestStatus.Ready: - task_map = [] - else: - task_map = list(self._rpc_task_map) + task_map = list(self._rpc_task_map) return task_map + rpc_builtin_methods diff --git a/core/dbt/task/rpc_server.py b/core/dbt/task/rpc_server.py index a11968555aa..d6dac6141a8 100644 --- a/core/dbt/task/rpc_server.py +++ b/core/dbt/task/rpc_server.py @@ -42,7 +42,7 @@ def reload_manager(task_manager, tasks): @contextmanager def signhup_replace(): - """Å context manager. Replace the current sighup handler with SIG_IGN on + """A context manager. Replace the current sighup handler with SIG_IGN on entering, and (if the current handler was not SIG_IGN) replace it on leaving. This is meant to be used inside a sighup handler itself to provide. a sort of locking model. @@ -85,7 +85,7 @@ def __init__(self, args, config, tasks=None): self._tasks = tasks or self._default_tasks() self.task_manager = rpc.TaskManager(self.args, self.config) self._reloader = None - reload_manager(self.task_manager, self._tasks) + self._reload_task_manager() # windows doesn't have SIGHUP so don't do sighup things if os.name != 'nt': @@ -97,6 +97,8 @@ def _reload_task_manager(self): """ # mark the task manager invalid for task running self.task_manager.set_compiling() + for task in self._tasks: + self.task_manager.reserve_handler(task) # compile in a thread that will fix up the tag manager when it's done reloader = threading.Thread( target=reload_manager, diff --git a/test/integration/042_sources_test/test_sources.py b/test/integration/042_sources_test/test_sources.py index eee15ee335d..34fbe1d9d39 100644 --- a/test/integration/042_sources_test/test_sources.py +++ b/test/integration/042_sources_test/test_sources.py @@ -308,755 +308,3 @@ def test_postgres_malformed_schema_nonstrict_will_not_break_run(self): def test_postgres_malformed_schema_strict_will_break_run(self): with self.assertRaises(CompilationException): self.run_dbt_with_vars(['run'], strict=True) - - -class ServerProcess(multiprocessing.Process): - def __init__(self, port, profiles_dir, cli_vars=None): - self.port = port - handle_and_check_args = [ - '--strict', 'rpc', '--log-cache-events', - '--port', str(self.port), - '--profiles-dir', profiles_dir - ] - if cli_vars: - handle_and_check_args.extend(['--vars', cli_vars]) - super().__init__( - target=handle_and_check, - args=(handle_and_check_args,), - name='ServerProcess') - - def is_up(self): - sock = socket.socket() - try: - sock.connect(('localhost', self.port)) - except socket.error: - return False - sock.close() - return True - - def start(self): - super().start() - for _ in range(10): - if self.is_up(): - break - time.sleep(0.5) - if not self.is_up(): - self.terminate() - raise Exception('server never appeared!') - - -def query_url(url, query): - headers = {'content-type': 'application/json'} - return requests.post(url, headers=headers, data=json.dumps(query)) - - -class BackgroundQueryProcess(multiprocessing.Process): - def __init__(self, query, url, group=None, name=None): - parent, child = multiprocessing.Pipe() - self.parent_pipe = parent - self.child_pipe = child - self.query = query - self.url = url - super().__init__(group=group, name=name) - - def run(self): - try: - result = query_url(self.url, self.query).json() - except Exception as exc: - self.child_pipe.send(('error', str(exc))) - else: - self.child_pipe.send(('result', result)) - - def wait_result(self): - result_type, result = self.parent_pipe.recv() - self.join() - if result_type == 'error': - raise Exception(result) - else: - return result - - -_select_from_ephemeral = '''with __dbt__CTE__ephemeral_model as ( - - -select 1 as id -)select * from __dbt__CTE__ephemeral_model''' - - -def addr_in_use(err, *args): - msg = str(err) - if 'Address already in use' in msg: - return True - if 'server never appeared!' in msg: - return True # this can happen because of the above - return False - - -@mark.flaky(rerun_filter=addr_in_use) -class TestRPCServer(BaseSourcesTest): - def setUp(self): - super().setUp() - port = random.randint(20000, 65535) - self._server = ServerProcess( - cli_vars='{{test_run_schema: {}}}'.format(self.unique_schema()), - profiles_dir=self.test_root_dir, - port=port - ) - self._server.start() - self.background_queries = [] - - def tearDown(self): - self._server.terminate() - for query in self.background_queries: - query.terminate() - super().tearDown() - - @property - def project_config(self): - return { - 'data-paths': ['data'], - 'quoting': {'database': True, 'schema': True, 'identifier': True}, - 'macro-paths': ['macros'], - } - - def build_query( - self, method, kwargs, sql=None, test_request_id=1, macros=None - ): - body_data = '' - if sql is not None: - body_data += sql - - if macros is not None: - body_data += macros - - if sql is not None or macros is not None: - kwargs['sql'] = b64(body_data.encode('utf-8')).decode('utf-8') - - return { - 'jsonrpc': '2.0', - 'method': method, - 'params': kwargs, - 'id': test_request_id - } - - @property - def url(self): - return 'http://localhost:{}/jsonrpc'.format(self._server.port) - - def query(self, _method, _sql=None, _test_request_id=1, macros=None, **kwargs): - built = self.build_query(_method, kwargs, _sql, _test_request_id, macros) - return query_url(self.url, built) - - def handle_result(self, bg_query, pipe): - result_type, result = pipe.recv() - bg_query.join() - if result_type == 'error': - raise result - else: - return result - - def background_query( - self, _method, _sql=None, _test_request_id=1, _block=False, macros=None, **kwargs - ): - built = self.build_query(_method, kwargs, _sql, _test_request_id, - macros) - - url = 'http://localhost:{}/jsonrpc'.format(self._server.port) - name = _method - if 'name' in kwargs: - name += ' ' + kwargs['name'] - bg_query = BackgroundQueryProcess(built, url, name=name) - self.background_queries.append(bg_query) - bg_query.start() - return bg_query - - def assertResultHasTimings(self, result, *names): - self.assertIn('timing', result) - timings = result['timing'] - self.assertEqual(len(timings), len(names)) - for expected_name, timing in zip(names, timings): - self.assertIn('name', timing) - self.assertEqual(timing['name'], expected_name) - self.assertIn('started_at', timing) - self.assertIn('completed_at', timing) - datetime.strptime(timing['started_at'], '%Y-%m-%dT%H:%M:%S.%fZ') - datetime.strptime(timing['completed_at'], '%Y-%m-%dT%H:%M:%S.%fZ') - - def assertIsResult(self, data, id_=1): - self.assertEqual(data['id'], id_) - self.assertEqual(data['jsonrpc'], '2.0') - self.assertIn('result', data) - self.assertNotIn('error', data) - return data['result'] - - def assertIsError(self, data, id_=1): - self.assertEqual(data['id'], id_) - self.assertEqual(data['jsonrpc'], '2.0') - self.assertIn('error', data) - self.assertNotIn('result', data) - return data['error'] - - def assertIsErrorWithCode(self, data, code, id_=1): - error = self.assertIsError(data, id_) - self.assertIn('code', error) - self.assertIn('message', error) - self.assertEqual(error['code'], code) - return error - - def assertIsErrorWith(self, data, code, message, error_data): - error = self.assertIsErrorWithCode(data, code) - if message is not None: - self.assertEqual(error['message'], message) - - if error_data is not None: - return self.assertHasErrorData(error, error_data) - else: - return error.get('data') - - def assertResultHasSql(self, data, raw_sql, compiled_sql=None): - if compiled_sql is None: - compiled_sql = raw_sql - result = self.assertIsResult(data) - self.assertIn('logs', result) - self.assertTrue(len(result['logs']) > 0) - self.assertIn('raw_sql', result) - self.assertIn('compiled_sql', result) - self.assertEqual(result['raw_sql'], raw_sql) - self.assertEqual(result['compiled_sql'], compiled_sql) - return result - - def assertSuccessfulCompilationResult(self, data, raw_sql, compiled_sql=None): - result = self.assertResultHasSql(data, raw_sql, compiled_sql) - self.assertNotIn('table', result) - # compile results still have an 'execute' timing, it just represents - # the time to construct a result object. - self.assertResultHasTimings(result, 'compile', 'execute') - - def assertSuccessfulRunResult(self, data, raw_sql, compiled_sql=None, table=None): - result = self.assertResultHasSql(data, raw_sql, compiled_sql) - self.assertIn('table', result) - if table is not None: - self.assertEqual(result['table'], table) - self.assertResultHasTimings(result, 'compile', 'execute') - - @use_profile('postgres') - def test_compile_postgres(self): - trivial = self.query( - 'compile', - 'select 1 as id', - name='foo' - ).json() - self.assertSuccessfulCompilationResult( - trivial, 'select 1 as id' - ) - - ref = self.query( - 'compile', - 'select * from {{ ref("descendant_model") }}', - name='foo' - ).json() - self.assertSuccessfulCompilationResult( - ref, - 'select * from {{ ref("descendant_model") }}', - compiled_sql='select * from "{}"."{}"."descendant_model"'.format( - self.default_database, - self.unique_schema()) - ) - - source = self.query( - 'compile', - 'select * from {{ source("test_source", "test_table") }}', - name='foo' - ).json() - self.assertSuccessfulCompilationResult( - source, - 'select * from {{ source("test_source", "test_table") }}', - compiled_sql='select * from "{}"."{}"."source"'.format( - self.default_database, - self.unique_schema()) - ) - - macro = self.query( - 'compile', - 'select {{ my_macro() }}', - name='foo', - macros='{% macro my_macro() %}1 as id{% endmacro %}' - ).json() - self.assertSuccessfulCompilationResult( - macro, - 'select {{ my_macro() }}', - compiled_sql='select 1 as id' - ) - - macro_override = self.query( - 'compile', - 'select {{ happy_little_macro() }}', - name='foo', - macros='{% macro override_me() %}2 as id{% endmacro %}' - ).json() - self.assertSuccessfulCompilationResult( - macro_override, - 'select {{ happy_little_macro() }}', - compiled_sql='select 2 as id' - ) - - macro_override_with_if_statement = self.query( - 'compile', - '{% if True %}select {{ happy_little_macro() }}{% endif %}', - name='foo', - macros='{% macro override_me() %}2 as id{% endmacro %}' - ).json() - self.assertSuccessfulCompilationResult( - macro_override_with_if_statement, - '{% if True %}select {{ happy_little_macro() }}{% endif %}', - compiled_sql='select 2 as id' - ) - - ephemeral = self.query( - 'compile', - 'select * from {{ ref("ephemeral_model") }}', - name='foo' - ).json() - self.assertSuccessfulCompilationResult( - ephemeral, - 'select * from {{ ref("ephemeral_model") }}', - compiled_sql=_select_from_ephemeral - ) - - @use_profile('postgres') - def test_run_postgres(self): - # seed + run dbt to make models before using them! - self.run_dbt_with_vars(['seed']) - self.run_dbt_with_vars(['run']) - data = self.query( - 'run', - 'select 1 as id', - name='foo' - ).json() - self.assertSuccessfulRunResult( - data, 'select 1 as id', table={'column_names': ['id'], 'rows': [[1.0]]} - ) - - ref = self.query( - 'run', - 'select * from {{ ref("descendant_model") }} order by updated_at limit 1', - name='foo' - ).json() - self.assertSuccessfulRunResult( - ref, - 'select * from {{ ref("descendant_model") }} order by updated_at limit 1', - compiled_sql='select * from "{}"."{}"."descendant_model" order by updated_at limit 1'.format( - self.default_database, - self.unique_schema()), - table={ - 'column_names': ['favorite_color', 'id', 'first_name', 'email', 'ip_address', 'updated_at'], - 'rows': [['blue', 38.0, 'Gary', 'gray11@statcounter.com', "'40.193.124.56'", '1970-01-27T10:04:51']], - } - ) - - source = self.query( - 'run', - 'select * from {{ source("test_source", "test_table") }} order by updated_at limit 1', - name='foo' - ).json() - self.assertSuccessfulRunResult( - source, - 'select * from {{ source("test_source", "test_table") }} order by updated_at limit 1', - compiled_sql='select * from "{}"."{}"."source" order by updated_at limit 1'.format( - self.default_database, - self.unique_schema()), - table={ - 'column_names': ['favorite_color', 'id', 'first_name', 'email', 'ip_address', 'updated_at'], - 'rows': [['blue', 38.0, 'Gary', 'gray11@statcounter.com', "'40.193.124.56'", '1970-01-27T10:04:51']], - } - ) - - macro = self.query( - 'run', - 'select {{ my_macro() }}', - name='foo', - macros='{% macro my_macro() %}1 as id{% endmacro %}' - ).json() - self.assertSuccessfulRunResult( - macro, - raw_sql='select {{ my_macro() }}', - compiled_sql='select 1 as id', - table={'column_names': ['id'], 'rows': [[1.0]]} - ) - - macro_override = self.query( - 'run', - 'select {{ happy_little_macro() }}', - name='foo', - macros='{% macro override_me() %}2 as id{% endmacro %}' - ).json() - self.assertSuccessfulRunResult( - macro_override, - raw_sql='select {{ happy_little_macro() }}', - compiled_sql='select 2 as id', - table={'column_names': ['id'], 'rows': [[2.0]]} - ) - - macro_override_with_if_statement = self.query( - 'run', - '{% if True %}select {{ happy_little_macro() }}{% endif %}', - name='foo', - macros='{% macro override_me() %}2 as id{% endmacro %}' - ).json() - self.assertSuccessfulRunResult( - macro_override_with_if_statement, - '{% if True %}select {{ happy_little_macro() }}{% endif %}', - compiled_sql='select 2 as id', - table={'column_names': ['id'], 'rows': [[2.0]]} - ) - - macro_with_raw_statement = self.query( - 'run', - '{% raw %}select 1 as{% endraw %}{{ test_macros() }}{% macro test_macros() %} id{% endmacro %}', - name='foo' - ).json() - self.assertSuccessfulRunResult( - macro_with_raw_statement, - '{% raw %}select 1 as{% endraw %}{{ test_macros() }}', - compiled_sql='select 1 as id', - table={'column_names': ['id'], 'rows': [[1.0]]} - ) - - macro_with_comment = self.query( - 'run', - '{% raw %}select 1 {% endraw %}{{ test_macros() }} {# my comment #}{% macro test_macros() -%} as{% endmacro %} id{# another comment #}', - name='foo' - ).json() - self.assertSuccessfulRunResult( - macro_with_comment, - '{% raw %}select 1 {% endraw %}{{ test_macros() }} {# my comment #} id{# another comment #}', - compiled_sql='select 1 as id', - table={'column_names': ['id'], 'rows': [[1.0]]} - ) - - ephemeral = self.query( - 'run', - 'select * from {{ ref("ephemeral_model") }}', - name='foo' - ).json() - self.assertSuccessfulRunResult( - ephemeral, - raw_sql='select * from {{ ref("ephemeral_model") }}', - compiled_sql=_select_from_ephemeral, - table={'column_names': ['id'], 'rows': [[1.0]]} - ) - - @mark.skipif(os.name == 'nt', reason='"kill" not supported on windows') - @mark.flaky(rerun_filter=None) - @use_profile('postgres') - def test_ps_kill_postgres(self): - done_query = self.query('compile', 'select 1 as id', name='done').json() - self.assertIsResult(done_query) - pg_sleeper, sleep_task_id, request_id = self._get_sleep_query() - - empty_ps_result = self.query('ps', completed=False, active=False).json() - result = self.assertIsResult(empty_ps_result) - self.assertEqual(len(result['rows']), 0) - - sleeper_ps_result = self.query('ps', completed=False, active=True).json() - result = self.assertIsResult(sleeper_ps_result) - self.assertEqual(len(result['rows']), 1) - rowdict = result['rows'] - self.assertEqual(rowdict[0]['request_id'], request_id) - self.assertEqual(rowdict[0]['method'], 'run') - self.assertEqual(rowdict[0]['state'], 'running') - self.assertEqual(rowdict[0]['timeout'], None) - - complete_ps_result = self.query('ps', completed=True, active=False).json() - result = self.assertIsResult(complete_ps_result) - self.assertEqual(len(result['rows']), 1) - rowdict = result['rows'] - self.assertEqual(rowdict[0]['request_id'], 1) - self.assertEqual(rowdict[0]['method'], 'compile') - self.assertEqual(rowdict[0]['state'], 'finished') - self.assertEqual(rowdict[0]['timeout'], None) - - all_ps_result = self.query('ps', completed=True, active=True).json() - result = self.assertIsResult(all_ps_result) - self.assertEqual(len(result['rows']), 2) - rowdict = result['rows'] - rowdict.sort(key=lambda r: r['start']) - self.assertEqual(rowdict[0]['request_id'], 1) - self.assertEqual(rowdict[0]['method'], 'compile') - self.assertEqual(rowdict[0]['state'], 'finished') - self.assertEqual(rowdict[0]['timeout'], None) - self.assertEqual(rowdict[1]['request_id'], request_id) - self.assertEqual(rowdict[1]['method'], 'run') - self.assertEqual(rowdict[1]['state'], 'running') - self.assertEqual(rowdict[1]['timeout'], None) - - self.kill_and_assert(pg_sleeper, sleep_task_id, request_id) - - def kill_and_assert(self, pg_sleeper, task_id, request_id): - kill_result = self.query('kill', task_id=task_id).json() - kill_time = time.time() - result = self.assertIsResult(kill_result) - self.assertTrue(result['killed']) - - sleeper_result = pg_sleeper.wait_result() - result_time = time.time() - error = self.assertIsErrorWithCode(sleeper_result, 10009, request_id) - self.assertEqual(error['message'], 'RPC process killed') - self.assertIn('data', error) - error_data = error['data'] - self.assertEqual(error_data['signum'], 2) - self.assertEqual(error_data['message'], 'RPC process killed by signal 2') - self.assertIn('logs', error_data) - return error_data - - def _get_sleep_query(self, request_id=90890, duration=15): - pg_sleeper = self.background_query( - 'run', - 'select pg_sleep({})'.format(duration), - _test_request_id=request_id, - name='sleeper', - ) - - for _ in range(20): - time.sleep(0.2) - sleeper_ps_result = self.query('ps', completed=False, active=True).json() - result = self.assertIsResult(sleeper_ps_result) - rows = result['rows'] - for row in rows: - if row['request_id'] == request_id and row['state'] == 'running': - return pg_sleeper, row['task_id'], request_id - - self.assertTrue(False, 'request ID never found running!') - - @mark.skipif(os.name == 'nt', reason='"kill" not supported on windows') - @mark.flaky(rerun_filter=lambda *a, **kw: True) - @use_profile('postgres') - def test_ps_kill_longwait_postgres(self): - pg_sleeper, sleep_task_id, request_id = self._get_sleep_query() - - # the test above frequently kills the process during parsing of the - # requested node. That's also a useful test, but we should test that - # we cancel the in-progress sleep query. - time.sleep(3) - - error_data = self.kill_and_assert(pg_sleeper, sleep_task_id, request_id) - # we should have logs if we did anything - self.assertTrue(len(error_data['logs']) > 0) - - @use_profile('postgres') - def test_invalid_requests_postgres(self): - data = self.query( - 'xxxxxnotamethodxxxxx', - 'hi this is not sql' - ).json() - self.assertIsErrorWith(data, -32601, 'Method not found', None) - - data = self.query( - 'compile', - 'select * from {{ reff("nonsource_descendant") }}', - name='mymodel' - ).json() - error_data = self.assertIsErrorWith(data, 10004, 'Compilation Error', { - 'type': 'CompilationException', - 'message': "Compilation Error in rpc mymodel (from remote system)\n 'reff' is undefined", - 'compiled_sql': None, - 'raw_sql': 'select * from {{ reff("nonsource_descendant") }}', - }) - self.assertIn('logs', error_data) - self.assertTrue(len(error_data['logs']) > 0) - - data = self.query( - 'run', - 'hi this is not sql', - name='foo' - ).json() - error_data = self.assertIsErrorWith(data, 10003, 'Database Error', { - 'type': 'DatabaseException', - 'message': 'Database Error in rpc foo (from remote system)\n syntax error at or near "hi"\n LINE 1: hi this is not sql\n ^', - 'compiled_sql': 'hi this is not sql', - 'raw_sql': 'hi this is not sql', - }) - self.assertIn('logs', error_data) - self.assertTrue(len(error_data['logs']) > 0) - - macro_no_override = self.query( - 'run', - 'select {{ happy_little_macro() }}', - name='foo', - ).json() - error_data = self.assertIsErrorWith(macro_no_override, 10004, 'Compilation Error', { - 'type': 'CompilationException', - 'raw_sql': 'select {{ happy_little_macro() }}', - 'compiled_sql': None - }) - self.assertIn('logs', error_data) - self.assertTrue(len(error_data['logs']) > 0) - - def assertHasErrorData(self, error, expected_error_data): - self.assertIn('data', error) - error_data = error['data'] - for key, value in expected_error_data.items(): - self.assertIn(key, error_data) - self.assertEqual(error_data[key], value) - return error_data - - @use_profile('postgres') - def test_timeout_postgres(self): - data = self.query( - 'run', - 'select from pg_sleep(5)', - name='foo', - timeout=1 - ).json() - error = self.assertIsErrorWithCode(data, 10008) - self.assertEqual(error['message'], 'RPC timeout error') - self.assertIn('data', error) - error_data = error['data'] - self.assertIn('timeout', error_data) - self.assertEqual(error_data['timeout'], 1) - self.assertIn('message', error_data) - self.assertEqual(error_data['message'], 'RPC timed out after 1s') - self.assertIn('logs', error_data) - # on windows, process start is so slow that frequently we won't have collected any logs - if os.name != 'nt': - self.assertTrue(len(error_data['logs']) > 0) - - @use_profile('postgres') - def test_seed_project_postgres(self): - # testing "dbt seed" is tricky so we'll just jam some sql in there - self.run_sql_file("seed.sql") - result = self.query('seed_project', show=True).json() - dct = self.assertIsResult(result) - self.assertTablesEqual('source', 'seed_expected') - self.assertIn('results', dct) - results = dct['results'] - self.assertEqual(len(results), 4) - self.assertEqual( - set(r['node']['name'] for r in results), - {'expected_multi_source', 'other_source_table', 'other_table', 'source'} - ) - - @use_profile('postgres') - def test_compile_project_postgres(self): - self.run_dbt_with_vars(['seed']) - result = self.query('compile_project').json() - dct = self.assertIsResult(result) - self.assertIn('results', dct) - results = dct['results'] - self.assertEqual(len(results), 11) - compiled = set(r['node']['name'] for r in results) - self.assertTrue(compiled.issuperset( - {'descendant_model', 'multi_source_model', 'nonsource_descendant'} - )) - self.assertNotIn('ephemeral_model', compiled) - - @use_profile('postgres') - def test_run_project_postgres(self): - self.run_dbt_with_vars(['seed']) - result = self.query('run_project').json() - dct = self.assertIsResult(result) - self.assertIn('results', dct) - results = dct['results'] - self.assertEqual(len(results), 3) - self.assertEqual( - set(r['node']['name'] for r in results), - {'descendant_model', 'multi_source_model', 'nonsource_descendant'} - ) - self.assertTablesEqual('multi_source_model', 'expected_multi_source') - - @use_profile('postgres') - def test_test_project_postgres(self): - self.run_dbt_with_vars(['seed']) - result = self.query('run_project').json() - dct = self.assertIsResult(result) - result = self.query('test_project').json() - dct = self.assertIsResult(result) - self.assertIn('results', dct) - results = dct['results'] - self.assertEqual(len(results), 4) - for result in results: - self.assertEqual(result['status'], 0.0) - self.assertNotIn('fail', result) - - def _wait_for_running(self, timeout=15, raise_on_timeout=True): - started = time.time() - time.sleep(0.5) - elapsed = time.time() - started - - while elapsed < timeout: - status = self.assertIsResult(self.query('status').json()) - if status['status'] == 'running': - return status - time.sleep(0.5) - elapsed = time.time() - started - - status = self.assertIsResult(self.query('status').json()) - if raise_on_timeout: - self.assertEqual( - status['status'], - 'ready', - f'exceeded max time of {timeout}: {elapsed} seconds elapsed' - ) - return status - - def assertRunning(self, sleepers): - sleeper_ps_result = self.query('ps', completed=False, active=True).json() - result = self.assertIsResult(sleeper_ps_result) - self.assertEqual(len(result['rows']), len(sleepers)) - result_map = {rd['request_id']: rd for rd in result['rows']} - for _, _, request_id in sleepers: - found = result_map[request_id] - self.assertEqual(found['request_id'], request_id) - self.assertEqual(found['method'], 'run') - self.assertEqual(found['state'], 'running') - self.assertEqual(found['timeout'], None) - - def _add_command(self, cmd, id_): - self.assertIsResult(self.query(cmd, _test_request_id=id_).json(), id_=id_) - - @mark.skipif(os.name == 'nt', reason='"sighup" not supported on windows') - @mark.flaky(rerun_filter=lambda *a, **kw: True) - @use_profile('postgres') - def test_sighup_postgres(self): - status = self.assertIsResult(self.query('status').json()) - self.assertEqual(status['status'], 'ready') - started_at = status['timestamp'] - - done_query = self.query('compile', 'select 1 as id', name='done').json() - self.assertIsResult(done_query) - sleepers = [] - command_ids = [] - - sleepers.append(self._get_sleep_query(1000, duration=60)) - self.assertRunning(sleepers) - - self._add_command('seed_project', 20) - command_ids.append(20) - self._add_command('run_project', 21) - command_ids.append(21) - - # sighup a few times - for _ in range(10): - os.kill(status['pid'], signal.SIGHUP) - - status = self._wait_for_running() - - # we should still still see our service: - self.assertRunning(sleepers) - - self._add_command('seed_project', 30) - command_ids.append(30) - self._add_command('run_project', 31) - command_ids.append(31) - - # start a new one too - sleepers.append(self._get_sleep_query(1001, duration=60)) - - # now we should see both - self.assertRunning(sleepers) - - # now pluck out the oldest one and kill it - dead, alive = sleepers - self.kill_and_assert(*dead) - self.assertRunning([alive]) - self.kill_and_assert(*alive) diff --git a/test/integration/048_rpc_test/data/expected_multi_source.csv b/test/integration/048_rpc_test/data/expected_multi_source.csv new file mode 100644 index 00000000000..de9c1c01da2 --- /dev/null +++ b/test/integration/048_rpc_test/data/expected_multi_source.csv @@ -0,0 +1,4 @@ +id,first_name,color +1,Larry,blue +2,Curly,red +3,Moe,green diff --git a/test/integration/048_rpc_test/data/other_source_table.csv b/test/integration/048_rpc_test/data/other_source_table.csv new file mode 100644 index 00000000000..a92b2cb8ee6 --- /dev/null +++ b/test/integration/048_rpc_test/data/other_source_table.csv @@ -0,0 +1,4 @@ +id,color +1,blue +2,red +3,green diff --git a/test/integration/048_rpc_test/data/other_table.csv b/test/integration/048_rpc_test/data/other_table.csv new file mode 100644 index 00000000000..56bdda92b65 --- /dev/null +++ b/test/integration/048_rpc_test/data/other_table.csv @@ -0,0 +1,4 @@ +id,first_name +1,Larry +2,Curly +3,Moe diff --git a/test/integration/048_rpc_test/data/source.csv b/test/integration/048_rpc_test/data/source.csv new file mode 100644 index 00000000000..a8f87412ef5 --- /dev/null +++ b/test/integration/048_rpc_test/data/source.csv @@ -0,0 +1,101 @@ +favorite_color,id,first_name,email,ip_address,updated_at +blue,1,Larry,lking0@miitbeian.gov.cn,'69.135.206.194',2008-09-12 19:08:31 +blue,2,Larry,lperkins1@toplist.cz,'64.210.133.162',1978-05-09 04:15:14 +blue,3,Anna,amontgomery2@miitbeian.gov.cn,'168.104.64.114',2011-10-16 04:07:57 +blue,4,Sandra,sgeorge3@livejournal.com,'229.235.252.98',1973-07-19 10:52:43 +blue,5,Fred,fwoods4@google.cn,'78.229.170.124',2012-09-30 16:38:29 +blue,6,Stephen,shanson5@livejournal.com,'182.227.157.105',1995-11-07 21:40:50 +blue,7,William,wmartinez6@upenn.edu,'135.139.249.50',1982-09-05 03:11:59 +blue,8,Jessica,jlong7@hao123.com,'203.62.178.210',1991-10-16 11:03:15 +blue,9,Douglas,dwhite8@tamu.edu,'178.187.247.1',1979-10-01 09:49:48 +blue,10,Lisa,lcoleman9@nydailynews.com,'168.234.128.249',2011-05-26 07:45:49 +blue,11,Ralph,rfieldsa@home.pl,'55.152.163.149',1972-11-18 19:06:11 +blue,12,Louise,lnicholsb@samsung.com,'141.116.153.154',2014-11-25 20:56:14 +blue,13,Clarence,cduncanc@sfgate.com,'81.171.31.133',2011-11-17 07:02:36 +blue,14,Daniel,dfranklind@omniture.com,'8.204.211.37',1980-09-13 00:09:04 +blue,15,Katherine,klanee@auda.org.au,'176.96.134.59',1997-08-22 19:36:56 +blue,16,Billy,bwardf@wikia.com,'214.108.78.85',2003-10-19 02:14:47 +blue,17,Annie,agarzag@ocn.ne.jp,'190.108.42.70',1988-10-28 15:12:35 +blue,18,Shirley,scolemanh@fastcompany.com,'109.251.164.84',1988-08-24 10:50:57 +blue,19,Roger,rfrazieri@scribd.com,'38.145.218.108',1985-12-31 15:17:15 +blue,20,Lillian,lstanleyj@goodreads.com,'47.57.236.17',1970-06-08 02:09:05 +blue,21,Aaron,arodriguezk@nps.gov,'205.245.118.221',1985-10-11 23:07:49 +blue,22,Patrick,pparkerl@techcrunch.com,'19.8.100.182',2006-03-29 12:53:56 +blue,23,Phillip,pmorenom@intel.com,'41.38.254.103',2011-11-07 15:35:43 +blue,24,Henry,hgarcian@newsvine.com,'1.191.216.252',2008-08-28 08:30:44 +blue,25,Irene,iturnero@opera.com,'50.17.60.190',1994-04-01 07:15:02 +blue,26,Andrew,adunnp@pen.io,'123.52.253.176',2000-11-01 06:03:25 +blue,27,David,dgutierrezq@wp.com,'238.23.203.42',1988-01-25 07:29:18 +blue,28,Henry,hsanchezr@cyberchimps.com,'248.102.2.185',1983-01-01 13:36:37 +blue,29,Evelyn,epetersons@gizmodo.com,'32.80.46.119',1979-07-16 17:24:12 +blue,30,Tammy,tmitchellt@purevolume.com,'249.246.167.88',2001-04-03 10:00:23 +blue,31,Jacqueline,jlittleu@domainmarket.com,'127.181.97.47',1986-02-11 21:35:50 +blue,32,Earl,eortizv@opera.com,'166.47.248.240',1996-07-06 08:16:27 +blue,33,Juan,jgordonw@sciencedirect.com,'71.77.2.200',1987-01-31 03:46:44 +blue,34,Diane,dhowellx@nyu.edu,'140.94.133.12',1994-06-11 02:30:05 +blue,35,Randy,rkennedyy@microsoft.com,'73.255.34.196',2005-05-26 20:28:39 +blue,36,Janice,jriveraz@time.com,'22.214.227.32',1990-02-09 04:16:52 +blue,37,Laura,lperry10@diigo.com,'159.148.145.73',2015-03-17 05:59:25 +blue,38,Gary,gray11@statcounter.com,'40.193.124.56',1970-01-27 10:04:51 +blue,39,Jesse,jmcdonald12@typepad.com,'31.7.86.103',2009-03-14 08:14:29 +blue,40,Sandra,sgonzalez13@goodreads.com,'223.80.168.239',1993-05-21 14:08:54 +blue,41,Scott,smoore14@archive.org,'38.238.46.83',1980-08-30 11:16:56 +blue,42,Phillip,pevans15@cisco.com,'158.234.59.34',2011-12-15 23:26:31 +blue,43,Steven,sriley16@google.ca,'90.247.57.68',2011-10-29 19:03:28 +blue,44,Deborah,dbrown17@hexun.com,'179.125.143.240',1995-04-10 14:36:07 +blue,45,Lori,lross18@ow.ly,'64.80.162.180',1980-12-27 16:49:15 +blue,46,Sean,sjackson19@tumblr.com,'240.116.183.69',1988-06-12 21:24:45 +blue,47,Terry,tbarnes1a@163.com,'118.38.213.137',1997-09-22 16:43:19 +blue,48,Dorothy,dross1b@ebay.com,'116.81.76.49',2005-02-28 13:33:24 +blue,49,Samuel,swashington1c@house.gov,'38.191.253.40',1989-01-19 21:15:48 +blue,50,Ralph,rcarter1d@tinyurl.com,'104.84.60.174',2007-08-11 10:21:49 +green,51,Wayne,whudson1e@princeton.edu,'90.61.24.102',1983-07-03 16:58:12 +green,52,Rose,rjames1f@plala.or.jp,'240.83.81.10',1995-06-08 11:46:23 +green,53,Louise,lcox1g@theglobeandmail.com,'105.11.82.145',2016-09-19 14:45:51 +green,54,Kenneth,kjohnson1h@independent.co.uk,'139.5.45.94',1976-08-17 11:26:19 +green,55,Donna,dbrown1i@amazon.co.uk,'19.45.169.45',2006-05-27 16:51:40 +green,56,Johnny,jvasquez1j@trellian.com,'118.202.238.23',1975-11-17 08:42:32 +green,57,Patrick,pramirez1k@tamu.edu,'231.25.153.198',1997-08-06 11:51:09 +green,58,Helen,hlarson1l@prweb.com,'8.40.21.39',1993-08-04 19:53:40 +green,59,Patricia,pspencer1m@gmpg.org,'212.198.40.15',1977-08-03 16:37:27 +green,60,Joseph,jspencer1n@marriott.com,'13.15.63.238',2005-07-23 20:22:06 +green,61,Phillip,pschmidt1o@blogtalkradio.com,'177.98.201.190',1976-05-19 21:47:44 +green,62,Joan,jwebb1p@google.ru,'105.229.170.71',1972-09-07 17:53:47 +green,63,Phyllis,pkennedy1q@imgur.com,'35.145.8.244',2000-01-01 22:33:37 +green,64,Katherine,khunter1r@smh.com.au,'248.168.205.32',1991-01-09 06:40:24 +green,65,Laura,lvasquez1s@wiley.com,'128.129.115.152',1997-10-23 12:04:56 +green,66,Juan,jdunn1t@state.gov,'44.228.124.51',2004-11-10 05:07:35 +green,67,Judith,jholmes1u@wiley.com,'40.227.179.115',1977-08-02 17:01:45 +green,68,Beverly,bbaker1v@wufoo.com,'208.34.84.59',2016-03-06 20:07:23 +green,69,Lawrence,lcarr1w@flickr.com,'59.158.212.223',1988-09-13 06:07:21 +green,70,Gloria,gwilliams1x@mtv.com,'245.231.88.33',1995-03-18 22:32:46 +green,71,Steven,ssims1y@cbslocal.com,'104.50.58.255',2001-08-05 21:26:20 +green,72,Betty,bmills1z@arstechnica.com,'103.177.214.220',1981-12-14 21:26:54 +green,73,Mildred,mfuller20@prnewswire.com,'151.158.8.130',2000-04-19 10:13:55 +green,74,Donald,dday21@icq.com,'9.178.102.255',1972-12-03 00:58:24 +green,75,Eric,ethomas22@addtoany.com,'85.2.241.227',1992-11-01 05:59:30 +green,76,Joyce,jarmstrong23@sitemeter.com,'169.224.20.36',1985-10-24 06:50:01 +green,77,Maria,mmartinez24@amazonaws.com,'143.189.167.135',2005-10-05 05:17:42 +green,78,Harry,hburton25@youtube.com,'156.47.176.237',1978-03-26 05:53:33 +green,79,Kevin,klawrence26@hao123.com,'79.136.183.83',1994-10-12 04:38:52 +green,80,David,dhall27@prweb.com,'133.149.172.153',1976-12-15 16:24:24 +green,81,Kathy,kperry28@twitter.com,'229.242.72.228',1979-03-04 02:58:56 +green,82,Adam,aprice29@elegantthemes.com,'13.145.21.10',1982-11-07 11:46:59 +green,83,Brandon,bgriffin2a@va.gov,'73.249.128.212',2013-10-30 05:30:36 +green,84,Henry,hnguyen2b@discovery.com,'211.36.214.242',1985-01-09 06:37:27 +green,85,Eric,esanchez2c@edublogs.org,'191.166.188.251',2004-05-01 23:21:42 +green,86,Jason,jlee2d@jimdo.com,'193.92.16.182',1973-01-08 09:05:39 +green,87,Diana,drichards2e@istockphoto.com,'19.130.175.245',1994-10-05 22:50:49 +green,88,Andrea,awelch2f@abc.net.au,'94.155.233.96',2002-04-26 08:41:44 +green,89,Louis,lwagner2g@miitbeian.gov.cn,'26.217.34.111',2003-08-25 07:56:39 +green,90,Jane,jsims2h@seesaa.net,'43.4.220.135',1987-03-20 20:39:04 +green,91,Larry,lgrant2i@si.edu,'97.126.79.34',2000-09-07 20:26:19 +green,92,Louis,ldean2j@prnewswire.com,'37.148.40.127',2011-09-16 20:12:14 +green,93,Jennifer,jcampbell2k@xing.com,'38.106.254.142',1988-07-15 05:06:49 +green,94,Wayne,wcunningham2l@google.com.hk,'223.28.26.187',2009-12-15 06:16:54 +green,95,Lori,lstevens2m@icq.com,'181.250.181.58',1984-10-28 03:29:19 +green,96,Judy,jsimpson2n@marriott.com,'180.121.239.219',1986-02-07 15:18:10 +green,97,Phillip,phoward2o@usa.gov,'255.247.0.175',2002-12-26 08:44:45 +green,98,Gloria,gwalker2p@usa.gov,'156.140.7.128',1997-10-04 07:58:58 +green,99,Paul,pjohnson2q@umn.edu,'183.59.198.197',1991-11-14 12:33:55 +green,100,Frank,fgreene2r@blogspot.com,'150.143.68.121',2010-06-12 23:55:39 diff --git a/test/integration/048_rpc_test/error_models/model.sql b/test/integration/048_rpc_test/error_models/model.sql new file mode 100644 index 00000000000..55bbcba67b4 --- /dev/null +++ b/test/integration/048_rpc_test/error_models/model.sql @@ -0,0 +1 @@ +select * from {{ source('test_source', 'test_table') }} diff --git a/test/integration/048_rpc_test/error_models/schema.yml b/test/integration/048_rpc_test/error_models/schema.yml new file mode 100644 index 00000000000..69cf1f304a6 --- /dev/null +++ b/test/integration/048_rpc_test/error_models/schema.yml @@ -0,0 +1,12 @@ +version: 2 +sources: + - name: test_source + loader: custom + freshness: + warn_after: {count: 10, period: hour} + error_after: {count: 1, period: day} + schema: invalid + tables: + - name: test_table + identifier: source + loaded_at_field: updated_at diff --git a/test/integration/048_rpc_test/macros/macro.sql b/test/integration/048_rpc_test/macros/macro.sql new file mode 100644 index 00000000000..a607a6e4ce7 --- /dev/null +++ b/test/integration/048_rpc_test/macros/macro.sql @@ -0,0 +1,14 @@ +{% macro override_me() -%} + {{ exceptions.raise_compiler_error('this is a bad macro') }} +{%- endmacro %} + +{% macro happy_little_macro() -%} + {{ override_me() }} +{%- endmacro %} + + +{% macro vacuum_source(source_name, table_name) -%} + {% call statement('stmt', auto_begin=false, fetch_result=false) %} + vacuum {{ source(source_name, table_name) }} + {% endcall %} +{%- endmacro %} diff --git a/test/integration/048_rpc_test/malformed_models/descendant_model.sql b/test/integration/048_rpc_test/malformed_models/descendant_model.sql new file mode 100644 index 00000000000..55bbcba67b4 --- /dev/null +++ b/test/integration/048_rpc_test/malformed_models/descendant_model.sql @@ -0,0 +1 @@ +select * from {{ source('test_source', 'test_table') }} diff --git a/test/integration/048_rpc_test/malformed_models/schema.yml b/test/integration/048_rpc_test/malformed_models/schema.yml new file mode 100644 index 00000000000..6962204bc7f --- /dev/null +++ b/test/integration/048_rpc_test/malformed_models/schema.yml @@ -0,0 +1,14 @@ +version: 2 +sources: + - name: test_source + loader: custom + schema: "{{ var('test_run_schema') }}" + tables: + - name: test_table + identifier: source + tests: + - relationships: + # this is invalid + - column_name: favorite_color + - to: ref('descendant_model') + - field: favorite_color diff --git a/test/integration/048_rpc_test/models/descendant_model.sql b/test/integration/048_rpc_test/models/descendant_model.sql new file mode 100644 index 00000000000..55bbcba67b4 --- /dev/null +++ b/test/integration/048_rpc_test/models/descendant_model.sql @@ -0,0 +1 @@ +select * from {{ source('test_source', 'test_table') }} diff --git a/test/integration/048_rpc_test/models/ephemeral_model.sql b/test/integration/048_rpc_test/models/ephemeral_model.sql new file mode 100644 index 00000000000..8de35cd3e21 --- /dev/null +++ b/test/integration/048_rpc_test/models/ephemeral_model.sql @@ -0,0 +1,3 @@ +{{ config(materialized='ephemeral') }} + +select 1 as id diff --git a/test/integration/048_rpc_test/models/multi_source_model.sql b/test/integration/048_rpc_test/models/multi_source_model.sql new file mode 100644 index 00000000000..e310206b0b4 --- /dev/null +++ b/test/integration/048_rpc_test/models/multi_source_model.sql @@ -0,0 +1,2 @@ +select * from {{ source('test_source', 'other_test_table')}} + join {{ source('other_source', 'test_table')}} using (id) diff --git a/test/integration/048_rpc_test/models/nonsource_descendant.sql b/test/integration/048_rpc_test/models/nonsource_descendant.sql new file mode 100644 index 00000000000..97f2151c754 --- /dev/null +++ b/test/integration/048_rpc_test/models/nonsource_descendant.sql @@ -0,0 +1 @@ +select * from {{ schema }}.source diff --git a/test/integration/048_rpc_test/models/schema.yml b/test/integration/048_rpc_test/models/schema.yml new file mode 100644 index 00000000000..e681d043377 --- /dev/null +++ b/test/integration/048_rpc_test/models/schema.yml @@ -0,0 +1,63 @@ +version: 2 +models: + - name: descendant_model + columns: + - name: favorite_color + tests: + - relationships: + to: source('test_source', 'test_table') + field: favorite_color +sources: + - name: test_source + loader: custom + freshness: + warn_after: {count: 10, period: hour} + error_after: {count: 1, period: day} + schema: "{{ var(env_var('DBT_TEST_SCHEMA_NAME_VARIABLE')) }}" + quoting: + identifier: True + tables: + - name: test_table + identifier: source + loaded_at_field: updated_at + freshness: + error_after: {count: 18, period: hour} + columns: + - name: favorite_color + description: The favorite color + - name: id + description: The user ID + tests: + - unique + - not_null + - name: first_name + description: The first name of the user + tests: [] + - name: email + description: The email address of the user + - name: ip_address + description: The last IP address the user logged in from + - name: updated_at + description: The last update time for this user + tests: + - relationships: + # do this as a table-level test, just to test out that aspect + column_name: favorite_color + to: ref('descendant_model') + field: favorite_color + - name: other_test_table + identifier: other_table + - name: disabled_test_table + freshness: null + loaded_at_field: updated_at + - name: other_source + schema: "{{ var('test_run_schema') }}" + quoting: + identifier: True + tables: + - name: test_table + identifier: other_source_table + - name: external_source + schema: "{{ var('test_run_alt_schema', var('test_run_schema')) }}" + tables: + - name: table diff --git a/test/integration/048_rpc_test/seed.sql b/test/integration/048_rpc_test/seed.sql new file mode 100644 index 00000000000..40110b99088 --- /dev/null +++ b/test/integration/048_rpc_test/seed.sql @@ -0,0 +1,113 @@ +create table {schema}.seed_expected ( + favorite_color TEXT, + id INTEGER, + first_name TEXT, + email TEXT, + ip_address TEXT, + updated_at TIMESTAMP WITHOUT TIME ZONE +); + + +INSERT INTO {schema}.seed_expected + ("favorite_color","id","first_name","email","ip_address","updated_at") +VALUES + ('blue',1,'Larry','lking0@miitbeian.gov.cn','''69.135.206.194''','2008-09-12 19:08:31'), + ('blue',2,'Larry','lperkins1@toplist.cz','''64.210.133.162''','1978-05-09 04:15:14'), + ('blue',3,'Anna','amontgomery2@miitbeian.gov.cn','''168.104.64.114''','2011-10-16 04:07:57'), + ('blue',4,'Sandra','sgeorge3@livejournal.com','''229.235.252.98''','1973-07-19 10:52:43'), + ('blue',5,'Fred','fwoods4@google.cn','''78.229.170.124''','2012-09-30 16:38:29'), + ('blue',6,'Stephen','shanson5@livejournal.com','''182.227.157.105''','1995-11-07 21:40:50'), + ('blue',7,'William','wmartinez6@upenn.edu','''135.139.249.50''','1982-09-05 03:11:59'), + ('blue',8,'Jessica','jlong7@hao123.com','''203.62.178.210''','1991-10-16 11:03:15'), + ('blue',9,'Douglas','dwhite8@tamu.edu','''178.187.247.1''','1979-10-01 09:49:48'), + ('blue',10,'Lisa','lcoleman9@nydailynews.com','''168.234.128.249''','2011-05-26 07:45:49'), + ('blue',11,'Ralph','rfieldsa@home.pl','''55.152.163.149''','1972-11-18 19:06:11'), + ('blue',12,'Louise','lnicholsb@samsung.com','''141.116.153.154''','2014-11-25 20:56:14'), + ('blue',13,'Clarence','cduncanc@sfgate.com','''81.171.31.133''','2011-11-17 07:02:36'), + ('blue',14,'Daniel','dfranklind@omniture.com','''8.204.211.37''','1980-09-13 00:09:04'), + ('blue',15,'Katherine','klanee@auda.org.au','''176.96.134.59''','1997-08-22 19:36:56'), + ('blue',16,'Billy','bwardf@wikia.com','''214.108.78.85''','2003-10-19 02:14:47'), + ('blue',17,'Annie','agarzag@ocn.ne.jp','''190.108.42.70''','1988-10-28 15:12:35'), + ('blue',18,'Shirley','scolemanh@fastcompany.com','''109.251.164.84''','1988-08-24 10:50:57'), + ('blue',19,'Roger','rfrazieri@scribd.com','''38.145.218.108''','1985-12-31 15:17:15'), + ('blue',20,'Lillian','lstanleyj@goodreads.com','''47.57.236.17''','1970-06-08 02:09:05'), + ('blue',21,'Aaron','arodriguezk@nps.gov','''205.245.118.221''','1985-10-11 23:07:49'), + ('blue',22,'Patrick','pparkerl@techcrunch.com','''19.8.100.182''','2006-03-29 12:53:56'), + ('blue',23,'Phillip','pmorenom@intel.com','''41.38.254.103''','2011-11-07 15:35:43'), + ('blue',24,'Henry','hgarcian@newsvine.com','''1.191.216.252''','2008-08-28 08:30:44'), + ('blue',25,'Irene','iturnero@opera.com','''50.17.60.190''','1994-04-01 07:15:02'), + ('blue',26,'Andrew','adunnp@pen.io','''123.52.253.176''','2000-11-01 06:03:25'), + ('blue',27,'David','dgutierrezq@wp.com','''238.23.203.42''','1988-01-25 07:29:18'), + ('blue',28,'Henry','hsanchezr@cyberchimps.com','''248.102.2.185''','1983-01-01 13:36:37'), + ('blue',29,'Evelyn','epetersons@gizmodo.com','''32.80.46.119''','1979-07-16 17:24:12'), + ('blue',30,'Tammy','tmitchellt@purevolume.com','''249.246.167.88''','2001-04-03 10:00:23'), + ('blue',31,'Jacqueline','jlittleu@domainmarket.com','''127.181.97.47''','1986-02-11 21:35:50'), + ('blue',32,'Earl','eortizv@opera.com','''166.47.248.240''','1996-07-06 08:16:27'), + ('blue',33,'Juan','jgordonw@sciencedirect.com','''71.77.2.200''','1987-01-31 03:46:44'), + ('blue',34,'Diane','dhowellx@nyu.edu','''140.94.133.12''','1994-06-11 02:30:05'), + ('blue',35,'Randy','rkennedyy@microsoft.com','''73.255.34.196''','2005-05-26 20:28:39'), + ('blue',36,'Janice','jriveraz@time.com','''22.214.227.32''','1990-02-09 04:16:52'), + ('blue',37,'Laura','lperry10@diigo.com','''159.148.145.73''','2015-03-17 05:59:25'), + ('blue',38,'Gary','gray11@statcounter.com','''40.193.124.56''','1970-01-27 10:04:51'), + ('blue',39,'Jesse','jmcdonald12@typepad.com','''31.7.86.103''','2009-03-14 08:14:29'), + ('blue',40,'Sandra','sgonzalez13@goodreads.com','''223.80.168.239''','1993-05-21 14:08:54'), + ('blue',41,'Scott','smoore14@archive.org','''38.238.46.83''','1980-08-30 11:16:56'), + ('blue',42,'Phillip','pevans15@cisco.com','''158.234.59.34''','2011-12-15 23:26:31'), + ('blue',43,'Steven','sriley16@google.ca','''90.247.57.68''','2011-10-29 19:03:28'), + ('blue',44,'Deborah','dbrown17@hexun.com','''179.125.143.240''','1995-04-10 14:36:07'), + ('blue',45,'Lori','lross18@ow.ly','''64.80.162.180''','1980-12-27 16:49:15'), + ('blue',46,'Sean','sjackson19@tumblr.com','''240.116.183.69''','1988-06-12 21:24:45'), + ('blue',47,'Terry','tbarnes1a@163.com','''118.38.213.137''','1997-09-22 16:43:19'), + ('blue',48,'Dorothy','dross1b@ebay.com','''116.81.76.49''','2005-02-28 13:33:24'), + ('blue',49,'Samuel','swashington1c@house.gov','''38.191.253.40''','1989-01-19 21:15:48'), + ('blue',50,'Ralph','rcarter1d@tinyurl.com','''104.84.60.174''','2007-08-11 10:21:49'), + ('green',51,'Wayne','whudson1e@princeton.edu','''90.61.24.102''','1983-07-03 16:58:12'), + ('green',52,'Rose','rjames1f@plala.or.jp','''240.83.81.10''','1995-06-08 11:46:23'), + ('green',53,'Louise','lcox1g@theglobeandmail.com','''105.11.82.145''','2016-09-19 14:45:51'), + ('green',54,'Kenneth','kjohnson1h@independent.co.uk','''139.5.45.94''','1976-08-17 11:26:19'), + ('green',55,'Donna','dbrown1i@amazon.co.uk','''19.45.169.45''','2006-05-27 16:51:40'), + ('green',56,'Johnny','jvasquez1j@trellian.com','''118.202.238.23''','1975-11-17 08:42:32'), + ('green',57,'Patrick','pramirez1k@tamu.edu','''231.25.153.198''','1997-08-06 11:51:09'), + ('green',58,'Helen','hlarson1l@prweb.com','''8.40.21.39''','1993-08-04 19:53:40'), + ('green',59,'Patricia','pspencer1m@gmpg.org','''212.198.40.15''','1977-08-03 16:37:27'), + ('green',60,'Joseph','jspencer1n@marriott.com','''13.15.63.238''','2005-07-23 20:22:06'), + ('green',61,'Phillip','pschmidt1o@blogtalkradio.com','''177.98.201.190''','1976-05-19 21:47:44'), + ('green',62,'Joan','jwebb1p@google.ru','''105.229.170.71''','1972-09-07 17:53:47'), + ('green',63,'Phyllis','pkennedy1q@imgur.com','''35.145.8.244''','2000-01-01 22:33:37'), + ('green',64,'Katherine','khunter1r@smh.com.au','''248.168.205.32''','1991-01-09 06:40:24'), + ('green',65,'Laura','lvasquez1s@wiley.com','''128.129.115.152''','1997-10-23 12:04:56'), + ('green',66,'Juan','jdunn1t@state.gov','''44.228.124.51''','2004-11-10 05:07:35'), + ('green',67,'Judith','jholmes1u@wiley.com','''40.227.179.115''','1977-08-02 17:01:45'), + ('green',68,'Beverly','bbaker1v@wufoo.com','''208.34.84.59''','2016-03-06 20:07:23'), + ('green',69,'Lawrence','lcarr1w@flickr.com','''59.158.212.223''','1988-09-13 06:07:21'), + ('green',70,'Gloria','gwilliams1x@mtv.com','''245.231.88.33''','1995-03-18 22:32:46'), + ('green',71,'Steven','ssims1y@cbslocal.com','''104.50.58.255''','2001-08-05 21:26:20'), + ('green',72,'Betty','bmills1z@arstechnica.com','''103.177.214.220''','1981-12-14 21:26:54'), + ('green',73,'Mildred','mfuller20@prnewswire.com','''151.158.8.130''','2000-04-19 10:13:55'), + ('green',74,'Donald','dday21@icq.com','''9.178.102.255''','1972-12-03 00:58:24'), + ('green',75,'Eric','ethomas22@addtoany.com','''85.2.241.227''','1992-11-01 05:59:30'), + ('green',76,'Joyce','jarmstrong23@sitemeter.com','''169.224.20.36''','1985-10-24 06:50:01'), + ('green',77,'Maria','mmartinez24@amazonaws.com','''143.189.167.135''','2005-10-05 05:17:42'), + ('green',78,'Harry','hburton25@youtube.com','''156.47.176.237''','1978-03-26 05:53:33'), + ('green',79,'Kevin','klawrence26@hao123.com','''79.136.183.83''','1994-10-12 04:38:52'), + ('green',80,'David','dhall27@prweb.com','''133.149.172.153''','1976-12-15 16:24:24'), + ('green',81,'Kathy','kperry28@twitter.com','''229.242.72.228''','1979-03-04 02:58:56'), + ('green',82,'Adam','aprice29@elegantthemes.com','''13.145.21.10''','1982-11-07 11:46:59'), + ('green',83,'Brandon','bgriffin2a@va.gov','''73.249.128.212''','2013-10-30 05:30:36'), + ('green',84,'Henry','hnguyen2b@discovery.com','''211.36.214.242''','1985-01-09 06:37:27'), + ('green',85,'Eric','esanchez2c@edublogs.org','''191.166.188.251''','2004-05-01 23:21:42'), + ('green',86,'Jason','jlee2d@jimdo.com','''193.92.16.182''','1973-01-08 09:05:39'), + ('green',87,'Diana','drichards2e@istockphoto.com','''19.130.175.245''','1994-10-05 22:50:49'), + ('green',88,'Andrea','awelch2f@abc.net.au','''94.155.233.96''','2002-04-26 08:41:44'), + ('green',89,'Louis','lwagner2g@miitbeian.gov.cn','''26.217.34.111''','2003-08-25 07:56:39'), + ('green',90,'Jane','jsims2h@seesaa.net','''43.4.220.135''','1987-03-20 20:39:04'), + ('green',91,'Larry','lgrant2i@si.edu','''97.126.79.34''','2000-09-07 20:26:19'), + ('green',92,'Louis','ldean2j@prnewswire.com','''37.148.40.127''','2011-09-16 20:12:14'), + ('green',93,'Jennifer','jcampbell2k@xing.com','''38.106.254.142''','1988-07-15 05:06:49'), + ('green',94,'Wayne','wcunningham2l@google.com.hk','''223.28.26.187''','2009-12-15 06:16:54'), + ('green',95,'Lori','lstevens2m@icq.com','''181.250.181.58''','1984-10-28 03:29:19'), + ('green',96,'Judy','jsimpson2n@marriott.com','''180.121.239.219''','1986-02-07 15:18:10'), + ('green',97,'Phillip','phoward2o@usa.gov','''255.247.0.175''','2002-12-26 08:44:45'), + ('green',98,'Gloria','gwalker2p@usa.gov','''156.140.7.128''','1997-10-04 07:58:58'), + ('green',99,'Paul','pjohnson2q@umn.edu','''183.59.198.197''','1991-11-14 12:33:55'), + ('green',100,'Frank','fgreene2r@blogspot.com','''150.143.68.121''','2010-06-12 23:55:39'); diff --git a/test/integration/048_rpc_test/test_rpc.py b/test/integration/048_rpc_test/test_rpc.py new file mode 100644 index 00000000000..eb96678d477 --- /dev/null +++ b/test/integration/048_rpc_test/test_rpc.py @@ -0,0 +1,787 @@ +import json +import multiprocessing +import os +import random +import signal +import socket +import time +from base64 import standard_b64encode as b64 +from datetime import datetime + +import requests +from pytest import mark + +from test.integration.base import DBTIntegrationTest, use_profile +from dbt.main import handle_and_check + + +class ServerProcess(multiprocessing.Process): + def __init__(self, port, profiles_dir, cli_vars=None): + self.port = port + handle_and_check_args = [ + '--strict', 'rpc', '--log-cache-events', + '--port', str(self.port), + '--profiles-dir', profiles_dir + ] + if cli_vars: + handle_and_check_args.extend(['--vars', cli_vars]) + super().__init__( + target=handle_and_check, + args=(handle_and_check_args,), + name='ServerProcess') + + def is_up(self): + sock = socket.socket() + try: + sock.connect(('localhost', self.port)) + except socket.error: + return False + sock.close() + result = query_url( + 'http://localhost:{}/jsonrpc'.format(self.port), + {'method': 'status', 'id': 1, 'jsonrpc': 2.0} + ).json() + return result['result']['status'] == 'ready' + + def start(self): + super().start() + for _ in range(10): + if self.is_up(): + break + time.sleep(0.5) + if not self.is_up(): + self.terminate() + raise Exception('server never appeared!') + + +def query_url(url, query): + headers = {'content-type': 'application/json'} + return requests.post(url, headers=headers, data=json.dumps(query)) + + +class BackgroundQueryProcess(multiprocessing.Process): + def __init__(self, query, url, group=None, name=None): + parent, child = multiprocessing.Pipe() + self.parent_pipe = parent + self.child_pipe = child + self.query = query + self.url = url + super().__init__(group=group, name=name) + + def run(self): + try: + result = query_url(self.url, self.query).json() + except Exception as exc: + self.child_pipe.send(('error', str(exc))) + else: + self.child_pipe.send(('result', result)) + + def wait_result(self): + result_type, result = self.parent_pipe.recv() + self.join() + if result_type == 'error': + raise Exception(result) + else: + return result + + +_select_from_ephemeral = '''with __dbt__CTE__ephemeral_model as ( + + +select 1 as id +)select * from __dbt__CTE__ephemeral_model''' + + +def addr_in_use(err, *args): + msg = str(err) + if 'Address already in use' in msg: + return True + if 'server never appeared!' in msg: + return True # this can happen because of the above + return False + + +@mark.flaky(rerun_filter=addr_in_use) +class TestRPCServer(DBTIntegrationTest): + def setUp(self): + super().setUp() + os.environ['DBT_TEST_SCHEMA_NAME_VARIABLE'] = 'test_run_schema' + self.run_dbt_with_vars(['seed'], strict=False) + port = random.randint(20000, 65535) + self._server = ServerProcess( + cli_vars='{{test_run_schema: {}}}'.format(self.unique_schema()), + profiles_dir=self.test_root_dir, + port=port + ) + self._server.start() + self.background_queries = [] + + def tearDown(self): + del os.environ['DBT_TEST_SCHEMA_NAME_VARIABLE'] + self._server.terminate() + for query in self.background_queries: + query.terminate() + super().tearDown() + + @property + def schema(self): + return "rpc_048" + + @property + def models(self): + return "models" + + def run_dbt_with_vars(self, cmd, *args, **kwargs): + cmd.extend(['--vars', + '{{test_run_schema: {}}}'.format(self.unique_schema())]) + return self.run_dbt(cmd, *args, **kwargs) + + @property + def project_config(self): + return { + 'data-paths': ['data'], + 'quoting': {'database': True, 'schema': True, 'identifier': True}, + 'macro-paths': ['macros'], + } + + def build_query( + self, method, kwargs, sql=None, test_request_id=1, macros=None + ): + body_data = '' + if sql is not None: + body_data += sql + + if macros is not None: + body_data += macros + + if sql is not None or macros is not None: + kwargs['sql'] = b64(body_data.encode('utf-8')).decode('utf-8') + + return { + 'jsonrpc': '2.0', + 'method': method, + 'params': kwargs, + 'id': test_request_id + } + + @property + def url(self): + return 'http://localhost:{}/jsonrpc'.format(self._server.port) + + def query(self, _method, _sql=None, _test_request_id=1, macros=None, **kwargs): + built = self.build_query(_method, kwargs, _sql, _test_request_id, macros) + return query_url(self.url, built) + + def handle_result(self, bg_query, pipe): + result_type, result = pipe.recv() + bg_query.join() + if result_type == 'error': + raise result + else: + return result + + def background_query( + self, _method, _sql=None, _test_request_id=1, _block=False, macros=None, **kwargs + ): + built = self.build_query(_method, kwargs, _sql, _test_request_id, + macros) + + url = 'http://localhost:{}/jsonrpc'.format(self._server.port) + name = _method + if 'name' in kwargs: + name += ' ' + kwargs['name'] + bg_query = BackgroundQueryProcess(built, url, name=name) + self.background_queries.append(bg_query) + bg_query.start() + return bg_query + + def assertResultHasTimings(self, result, *names): + self.assertIn('timing', result) + timings = result['timing'] + self.assertEqual(len(timings), len(names)) + for expected_name, timing in zip(names, timings): + self.assertIn('name', timing) + self.assertEqual(timing['name'], expected_name) + self.assertIn('started_at', timing) + self.assertIn('completed_at', timing) + datetime.strptime(timing['started_at'], '%Y-%m-%dT%H:%M:%S.%fZ') + datetime.strptime(timing['completed_at'], '%Y-%m-%dT%H:%M:%S.%fZ') + + def assertIsResult(self, data, id_=1): + self.assertEqual(data['id'], id_) + self.assertEqual(data['jsonrpc'], '2.0') + self.assertIn('result', data) + self.assertNotIn('error', data) + return data['result'] + + def assertIsError(self, data, id_=1): + self.assertEqual(data['id'], id_) + self.assertEqual(data['jsonrpc'], '2.0') + self.assertIn('error', data) + self.assertNotIn('result', data) + return data['error'] + + def assertIsErrorWithCode(self, data, code, id_=1): + error = self.assertIsError(data, id_) + self.assertIn('code', error) + self.assertIn('message', error) + self.assertEqual(error['code'], code) + return error + + def assertIsErrorWith(self, data, code, message, error_data): + error = self.assertIsErrorWithCode(data, code) + if message is not None: + self.assertEqual(error['message'], message) + + if error_data is not None: + return self.assertHasErrorData(error, error_data) + else: + return error.get('data') + + def assertResultHasSql(self, data, raw_sql, compiled_sql=None): + if compiled_sql is None: + compiled_sql = raw_sql + result = self.assertIsResult(data) + self.assertIn('logs', result) + self.assertTrue(len(result['logs']) > 0) + self.assertIn('raw_sql', result) + self.assertIn('compiled_sql', result) + self.assertEqual(result['raw_sql'], raw_sql) + self.assertEqual(result['compiled_sql'], compiled_sql) + return result + + def assertSuccessfulCompilationResult(self, data, raw_sql, compiled_sql=None): + result = self.assertResultHasSql(data, raw_sql, compiled_sql) + self.assertNotIn('table', result) + # compile results still have an 'execute' timing, it just represents + # the time to construct a result object. + self.assertResultHasTimings(result, 'compile', 'execute') + + def assertSuccessfulRunResult(self, data, raw_sql, compiled_sql=None, table=None): + result = self.assertResultHasSql(data, raw_sql, compiled_sql) + self.assertIn('table', result) + if table is not None: + self.assertEqual(result['table'], table) + self.assertResultHasTimings(result, 'compile', 'execute') + + @use_profile('postgres') + def test_compile_postgres(self): + trivial = self.query( + 'compile', + 'select 1 as id', + name='foo' + ).json() + self.assertSuccessfulCompilationResult( + trivial, 'select 1 as id' + ) + + ref = self.query( + 'compile', + 'select * from {{ ref("descendant_model") }}', + name='foo' + ).json() + self.assertSuccessfulCompilationResult( + ref, + 'select * from {{ ref("descendant_model") }}', + compiled_sql='select * from "{}"."{}"."descendant_model"'.format( + self.default_database, + self.unique_schema()) + ) + + source = self.query( + 'compile', + 'select * from {{ source("test_source", "test_table") }}', + name='foo' + ).json() + self.assertSuccessfulCompilationResult( + source, + 'select * from {{ source("test_source", "test_table") }}', + compiled_sql='select * from "{}"."{}"."source"'.format( + self.default_database, + self.unique_schema()) + ) + + macro = self.query( + 'compile', + 'select {{ my_macro() }}', + name='foo', + macros='{% macro my_macro() %}1 as id{% endmacro %}' + ).json() + self.assertSuccessfulCompilationResult( + macro, + 'select {{ my_macro() }}', + compiled_sql='select 1 as id' + ) + + macro_override = self.query( + 'compile', + 'select {{ happy_little_macro() }}', + name='foo', + macros='{% macro override_me() %}2 as id{% endmacro %}' + ).json() + self.assertSuccessfulCompilationResult( + macro_override, + 'select {{ happy_little_macro() }}', + compiled_sql='select 2 as id' + ) + + macro_override_with_if_statement = self.query( + 'compile', + '{% if True %}select {{ happy_little_macro() }}{% endif %}', + name='foo', + macros='{% macro override_me() %}2 as id{% endmacro %}' + ).json() + self.assertSuccessfulCompilationResult( + macro_override_with_if_statement, + '{% if True %}select {{ happy_little_macro() }}{% endif %}', + compiled_sql='select 2 as id' + ) + + ephemeral = self.query( + 'compile', + 'select * from {{ ref("ephemeral_model") }}', + name='foo' + ).json() + self.assertSuccessfulCompilationResult( + ephemeral, + 'select * from {{ ref("ephemeral_model") }}', + compiled_sql=_select_from_ephemeral + ) + + @use_profile('postgres') + def test_run_postgres(self): + # seed + run dbt to make models before using them! + self.run_dbt_with_vars(['seed']) + self.run_dbt_with_vars(['run']) + data = self.query( + 'run', + 'select 1 as id', + name='foo' + ).json() + self.assertSuccessfulRunResult( + data, 'select 1 as id', table={'column_names': ['id'], 'rows': [[1.0]]} + ) + + ref = self.query( + 'run', + 'select * from {{ ref("descendant_model") }} order by updated_at limit 1', + name='foo' + ).json() + self.assertSuccessfulRunResult( + ref, + 'select * from {{ ref("descendant_model") }} order by updated_at limit 1', + compiled_sql='select * from "{}"."{}"."descendant_model" order by updated_at limit 1'.format( + self.default_database, + self.unique_schema()), + table={ + 'column_names': ['favorite_color', 'id', 'first_name', 'email', 'ip_address', 'updated_at'], + 'rows': [['blue', 38.0, 'Gary', 'gray11@statcounter.com', "'40.193.124.56'", '1970-01-27T10:04:51']], + } + ) + + source = self.query( + 'run', + 'select * from {{ source("test_source", "test_table") }} order by updated_at limit 1', + name='foo' + ).json() + self.assertSuccessfulRunResult( + source, + 'select * from {{ source("test_source", "test_table") }} order by updated_at limit 1', + compiled_sql='select * from "{}"."{}"."source" order by updated_at limit 1'.format( + self.default_database, + self.unique_schema()), + table={ + 'column_names': ['favorite_color', 'id', 'first_name', 'email', 'ip_address', 'updated_at'], + 'rows': [['blue', 38.0, 'Gary', 'gray11@statcounter.com', "'40.193.124.56'", '1970-01-27T10:04:51']], + } + ) + + macro = self.query( + 'run', + 'select {{ my_macro() }}', + name='foo', + macros='{% macro my_macro() %}1 as id{% endmacro %}' + ).json() + self.assertSuccessfulRunResult( + macro, + raw_sql='select {{ my_macro() }}', + compiled_sql='select 1 as id', + table={'column_names': ['id'], 'rows': [[1.0]]} + ) + + macro_override = self.query( + 'run', + 'select {{ happy_little_macro() }}', + name='foo', + macros='{% macro override_me() %}2 as id{% endmacro %}' + ).json() + self.assertSuccessfulRunResult( + macro_override, + raw_sql='select {{ happy_little_macro() }}', + compiled_sql='select 2 as id', + table={'column_names': ['id'], 'rows': [[2.0]]} + ) + + macro_override_with_if_statement = self.query( + 'run', + '{% if True %}select {{ happy_little_macro() }}{% endif %}', + name='foo', + macros='{% macro override_me() %}2 as id{% endmacro %}' + ).json() + self.assertSuccessfulRunResult( + macro_override_with_if_statement, + '{% if True %}select {{ happy_little_macro() }}{% endif %}', + compiled_sql='select 2 as id', + table={'column_names': ['id'], 'rows': [[2.0]]} + ) + + macro_with_raw_statement = self.query( + 'run', + '{% raw %}select 1 as{% endraw %}{{ test_macros() }}{% macro test_macros() %} id{% endmacro %}', + name='foo' + ).json() + self.assertSuccessfulRunResult( + macro_with_raw_statement, + '{% raw %}select 1 as{% endraw %}{{ test_macros() }}', + compiled_sql='select 1 as id', + table={'column_names': ['id'], 'rows': [[1.0]]} + ) + + macro_with_comment = self.query( + 'run', + '{% raw %}select 1 {% endraw %}{{ test_macros() }} {# my comment #}{% macro test_macros() -%} as{% endmacro %} id{# another comment #}', + name='foo' + ).json() + self.assertSuccessfulRunResult( + macro_with_comment, + '{% raw %}select 1 {% endraw %}{{ test_macros() }} {# my comment #} id{# another comment #}', + compiled_sql='select 1 as id', + table={'column_names': ['id'], 'rows': [[1.0]]} + ) + + ephemeral = self.query( + 'run', + 'select * from {{ ref("ephemeral_model") }}', + name='foo' + ).json() + self.assertSuccessfulRunResult( + ephemeral, + raw_sql='select * from {{ ref("ephemeral_model") }}', + compiled_sql=_select_from_ephemeral, + table={'column_names': ['id'], 'rows': [[1.0]]} + ) + + @mark.skipif(os.name == 'nt', reason='"kill" not supported on windows') + @mark.flaky(rerun_filter=None) + @use_profile('postgres') + def test_ps_kill_postgres(self): + done_query = self.query('compile', 'select 1 as id', name='done').json() + self.assertIsResult(done_query) + pg_sleeper, sleep_task_id, request_id = self._get_sleep_query() + + empty_ps_result = self.query('ps', completed=False, active=False).json() + result = self.assertIsResult(empty_ps_result) + self.assertEqual(len(result['rows']), 0) + + sleeper_ps_result = self.query('ps', completed=False, active=True).json() + result = self.assertIsResult(sleeper_ps_result) + self.assertEqual(len(result['rows']), 1) + rowdict = result['rows'] + self.assertEqual(rowdict[0]['request_id'], request_id) + self.assertEqual(rowdict[0]['method'], 'run') + self.assertEqual(rowdict[0]['state'], 'running') + self.assertEqual(rowdict[0]['timeout'], None) + + complete_ps_result = self.query('ps', completed=True, active=False).json() + result = self.assertIsResult(complete_ps_result) + self.assertEqual(len(result['rows']), 1) + rowdict = result['rows'] + self.assertEqual(rowdict[0]['request_id'], 1) + self.assertEqual(rowdict[0]['method'], 'compile') + self.assertEqual(rowdict[0]['state'], 'finished') + self.assertEqual(rowdict[0]['timeout'], None) + + all_ps_result = self.query('ps', completed=True, active=True).json() + result = self.assertIsResult(all_ps_result) + self.assertEqual(len(result['rows']), 2) + rowdict = result['rows'] + rowdict.sort(key=lambda r: r['start']) + self.assertEqual(rowdict[0]['request_id'], 1) + self.assertEqual(rowdict[0]['method'], 'compile') + self.assertEqual(rowdict[0]['state'], 'finished') + self.assertEqual(rowdict[0]['timeout'], None) + self.assertEqual(rowdict[1]['request_id'], request_id) + self.assertEqual(rowdict[1]['method'], 'run') + self.assertEqual(rowdict[1]['state'], 'running') + self.assertEqual(rowdict[1]['timeout'], None) + + self.kill_and_assert(pg_sleeper, sleep_task_id, request_id) + + def kill_and_assert(self, pg_sleeper, task_id, request_id): + kill_result = self.query('kill', task_id=task_id).json() + kill_time = time.time() + result = self.assertIsResult(kill_result) + self.assertTrue(result['killed']) + + sleeper_result = pg_sleeper.wait_result() + result_time = time.time() + error = self.assertIsErrorWithCode(sleeper_result, 10009, request_id) + self.assertEqual(error['message'], 'RPC process killed') + self.assertIn('data', error) + error_data = error['data'] + self.assertEqual(error_data['signum'], 2) + self.assertEqual(error_data['message'], 'RPC process killed by signal 2') + self.assertIn('logs', error_data) + return error_data + + def _get_sleep_query(self, request_id=90890, duration=15): + pg_sleeper = self.background_query( + 'run', + 'select pg_sleep({})'.format(duration), + _test_request_id=request_id, + name='sleeper', + ) + + for _ in range(20): + time.sleep(0.2) + sleeper_ps_result = self.query('ps', completed=False, active=True).json() + result = self.assertIsResult(sleeper_ps_result) + rows = result['rows'] + for row in rows: + if row['request_id'] == request_id and row['state'] == 'running': + return pg_sleeper, row['task_id'], request_id + + self.assertTrue(False, 'request ID never found running!') + + @mark.skipif(os.name == 'nt', reason='"kill" not supported on windows') + @mark.flaky(rerun_filter=lambda *a, **kw: True) + @use_profile('postgres') + def test_ps_kill_longwait_postgres(self): + pg_sleeper, sleep_task_id, request_id = self._get_sleep_query() + + # the test above frequently kills the process during parsing of the + # requested node. That's also a useful test, but we should test that + # we cancel the in-progress sleep query. + time.sleep(3) + + error_data = self.kill_and_assert(pg_sleeper, sleep_task_id, request_id) + # we should have logs if we did anything + self.assertTrue(len(error_data['logs']) > 0) + + @use_profile('postgres') + def test_invalid_requests_postgres(self): + data = self.query( + 'xxxxxnotamethodxxxxx', + 'hi this is not sql' + ).json() + self.assertIsErrorWith(data, -32601, 'Method not found', None) + + data = self.query( + 'compile', + 'select * from {{ reff("nonsource_descendant") }}', + name='mymodel' + ).json() + error_data = self.assertIsErrorWith(data, 10004, 'Compilation Error', { + 'type': 'CompilationException', + 'message': "Compilation Error in rpc mymodel (from remote system)\n 'reff' is undefined", + 'compiled_sql': None, + 'raw_sql': 'select * from {{ reff("nonsource_descendant") }}', + }) + self.assertIn('logs', error_data) + self.assertTrue(len(error_data['logs']) > 0) + + data = self.query( + 'run', + 'hi this is not sql', + name='foo' + ).json() + error_data = self.assertIsErrorWith(data, 10003, 'Database Error', { + 'type': 'DatabaseException', + 'message': 'Database Error in rpc foo (from remote system)\n syntax error at or near "hi"\n LINE 1: hi this is not sql\n ^', + 'compiled_sql': 'hi this is not sql', + 'raw_sql': 'hi this is not sql', + }) + self.assertIn('logs', error_data) + self.assertTrue(len(error_data['logs']) > 0) + + macro_no_override = self.query( + 'run', + 'select {{ happy_little_macro() }}', + name='foo', + ).json() + error_data = self.assertIsErrorWith(macro_no_override, 10004, 'Compilation Error', { + 'type': 'CompilationException', + 'raw_sql': 'select {{ happy_little_macro() }}', + 'compiled_sql': None + }) + self.assertIn('logs', error_data) + self.assertTrue(len(error_data['logs']) > 0) + + def assertHasErrorData(self, error, expected_error_data): + self.assertIn('data', error) + error_data = error['data'] + for key, value in expected_error_data.items(): + self.assertIn(key, error_data) + self.assertEqual(error_data[key], value) + return error_data + + @use_profile('postgres') + def test_timeout_postgres(self): + data = self.query( + 'run', + 'select from pg_sleep(5)', + name='foo', + timeout=1 + ).json() + error = self.assertIsErrorWithCode(data, 10008) + self.assertEqual(error['message'], 'RPC timeout error') + self.assertIn('data', error) + error_data = error['data'] + self.assertIn('timeout', error_data) + self.assertEqual(error_data['timeout'], 1) + self.assertIn('message', error_data) + self.assertEqual(error_data['message'], 'RPC timed out after 1s') + self.assertIn('logs', error_data) + # on windows, process start is so slow that frequently we won't have collected any logs + if os.name != 'nt': + self.assertTrue(len(error_data['logs']) > 0) + + @use_profile('postgres') + def test_seed_project_postgres(self): + # testing "dbt seed" is tricky so we'll just jam some sql in there + self.run_sql_file("seed.sql") + result = self.query('seed_project', show=True).json() + dct = self.assertIsResult(result) + self.assertTablesEqual('source', 'seed_expected') + self.assertIn('results', dct) + results = dct['results'] + self.assertEqual(len(results), 4) + self.assertEqual( + set(r['node']['name'] for r in results), + {'expected_multi_source', 'other_source_table', 'other_table', 'source'} + ) + + @use_profile('postgres') + def test_compile_project_postgres(self): + self.run_dbt_with_vars(['seed']) + result = self.query('compile_project').json() + dct = self.assertIsResult(result) + self.assertIn('results', dct) + results = dct['results'] + self.assertEqual(len(results), 11) + compiled = set(r['node']['name'] for r in results) + self.assertTrue(compiled.issuperset( + {'descendant_model', 'multi_source_model', 'nonsource_descendant'} + )) + self.assertNotIn('ephemeral_model', compiled) + + @use_profile('postgres') + def test_run_project_postgres(self): + self.run_dbt_with_vars(['seed']) + result = self.query('run_project').json() + dct = self.assertIsResult(result) + self.assertIn('results', dct) + results = dct['results'] + self.assertEqual(len(results), 3) + self.assertEqual( + set(r['node']['name'] for r in results), + {'descendant_model', 'multi_source_model', 'nonsource_descendant'} + ) + self.assertTablesEqual('multi_source_model', 'expected_multi_source') + + @use_profile('postgres') + def test_test_project_postgres(self): + self.run_dbt_with_vars(['seed']) + result = self.query('run_project').json() + dct = self.assertIsResult(result) + result = self.query('test_project').json() + dct = self.assertIsResult(result) + self.assertIn('results', dct) + results = dct['results'] + self.assertEqual(len(results), 4) + for result in results: + self.assertEqual(result['status'], 0.0) + self.assertNotIn('fail', result) + + def _wait_for_running(self, timeout=15, raise_on_timeout=True): + started = time.time() + time.sleep(0.5) + elapsed = time.time() - started + + while elapsed < timeout: + status = self.assertIsResult(self.query('status').json()) + if status['status'] == 'running': + return status + time.sleep(0.5) + elapsed = time.time() - started + + status = self.assertIsResult(self.query('status').json()) + if raise_on_timeout: + self.assertEqual( + status['status'], + 'ready', + f'exceeded max time of {timeout}: {elapsed} seconds elapsed' + ) + return status + + def assertRunning(self, sleepers): + sleeper_ps_result = self.query('ps', completed=False, active=True).json() + result = self.assertIsResult(sleeper_ps_result) + self.assertEqual(len(result['rows']), len(sleepers)) + result_map = {rd['request_id']: rd for rd in result['rows']} + for _, _, request_id in sleepers: + found = result_map[request_id] + self.assertEqual(found['request_id'], request_id) + self.assertEqual(found['method'], 'run') + self.assertEqual(found['state'], 'running') + self.assertEqual(found['timeout'], None) + + def _add_command(self, cmd, id_): + self.assertIsResult(self.query(cmd, _test_request_id=id_).json(), id_=id_) + + @mark.skipif(os.name == 'nt', reason='"sighup" not supported on windows') + @mark.flaky(rerun_filter=lambda *a, **kw: True) + @use_profile('postgres') + def test_sighup_postgres(self): + status = self.assertIsResult(self.query('status').json()) + self.assertEqual(status['status'], 'ready') + started_at = status['timestamp'] + + done_query = self.query('compile', 'select 1 as id', name='done').json() + self.assertIsResult(done_query) + sleepers = [] + command_ids = [] + + sleepers.append(self._get_sleep_query(1000, duration=60)) + self.assertRunning(sleepers) + + self._add_command('seed_project', 20) + command_ids.append(20) + self._add_command('run_project', 21) + command_ids.append(21) + + # sighup a few times + for _ in range(10): + os.kill(status['pid'], signal.SIGHUP) + + status = self._wait_for_running() + + # we should still still see our service: + self.assertRunning(sleepers) + + self._add_command('seed_project', 30) + command_ids.append(30) + self._add_command('run_project', 31) + command_ids.append(31) + + # start a new one too + sleepers.append(self._get_sleep_query(1001, duration=60)) + + # now we should see both + self.assertRunning(sleepers) + + # now pluck out the oldest one and kill it + dead, alive = sleepers + self.kill_and_assert(*dead) + self.assertRunning([alive]) + self.kill_and_assert(*alive)