diff --git a/core/dbt/parser/schemas.py b/core/dbt/parser/schemas.py index dd05dffa1d6..ec752b809ca 100644 --- a/core/dbt/parser/schemas.py +++ b/core/dbt/parser/schemas.py @@ -303,7 +303,7 @@ def parse_test( '\n\t{}\n\t@: {}' .format(block.path.original_file_path, exc.msg, context) ) - raise CompilationException(msg) + raise CompilationException(msg) from exc def _calculate_freshness( self, diff --git a/core/dbt/rpc/task_handler.py b/core/dbt/rpc/task_handler.py index de5bd792558..ff0b08c5b48 100644 --- a/core/dbt/rpc/task_handler.py +++ b/core/dbt/rpc/task_handler.py @@ -146,6 +146,11 @@ def set_parse_state_with( except Exception as exc: log_dicts = [r.to_dict() for r in logs()] manager.set_compile_exception(exc, logs=log_dicts) + # re-raise to ensure any exception handlers above trigger. We might be + # in an API call that set the parse state, in which case we don't want + # to swallow the exception - it also should report its failure to the + # task manager. + raise else: log_dicts = [r.to_dict() for r in logs()] manager.set_ready(log_dicts) diff --git a/core/dbt/task/rpc/deps.py b/core/dbt/task/rpc/deps.py index 41cf101bbd9..b6506434a45 100644 --- a/core/dbt/task/rpc/deps.py +++ b/core/dbt/task/rpc/deps.py @@ -1,10 +1,10 @@ import os import shutil -from .cli import HasCLI from dbt.contracts.rpc import ( RPCNoParameters, RemoteEmptyResult, RemoteMethodFlags, ) +from dbt.rpc.method import RemoteMethod from dbt.task.deps import DepsTask @@ -15,7 +15,10 @@ def _clean_deps(config): os.makedirs(modules_dir) -class RemoteDepsTask(HasCLI[RPCNoParameters, RemoteEmptyResult], DepsTask): +class RemoteDepsTask( + RemoteMethod[RPCNoParameters, RemoteEmptyResult], + DepsTask, +): METHOD_NAME = 'deps' def get_flags(self) -> RemoteMethodFlags: diff --git a/test/integration/048_rpc_test/test_rpc.py b/test/integration/048_rpc_test/test_rpc.py index 4c28cae95e0..2f01cccbe0a 100644 --- a/test/integration/048_rpc_test/test_rpc.py +++ b/test/integration/048_rpc_test/test_rpc.py @@ -949,6 +949,7 @@ def test_docs_generate_postgres_cli(self): def test_deps_postgres(self): self.async_query('deps').json() + @mark.skip(reason='cli_args + deps not supported for now') @use_profile('postgres') def test_deps_postgres_cli(self): self.async_query('cli_args', cli='deps').json() @@ -1056,43 +1057,6 @@ def test_gc_by_id_postgres(self): result = self.assertIsResult(resp) self.assertEqual(len(result['rows']), 0) - @use_profile('postgres') - def test_postgres_gc_change_interval(self): - num_requests = 10 - self.make_many_requests(num_requests) - - # all present - resp = self.query('ps', completed=True, active=True).json() - result = self.assertIsResult(resp) - self.assertEqual(len(result['rows']), num_requests) - - resp = self.query('gc', settings=dict(maxsize=1000, reapsize=5, auto_reap_age=0.1)).json() - result = self.assertIsResult(resp) - self.assertEqual(len(result['deleted']), 0) - self.assertEqual(len(result['missing']), 0) - self.assertEqual(len(result['running']), 0) - time.sleep(0.5) - - # all cleared up - test_resp = self.query('ps', completed=True, active=True) - resp = test_resp.json() - result = self.assertIsResult(resp) - self.assertEqual(len(result['rows']), 0) - - resp = self.query('gc', settings=dict(maxsize=2, reapsize=5, auto_reap_age=10000)).json() - result = self.assertIsResult(resp) - self.assertEqual(len(result['deleted']), 0) - self.assertEqual(len(result['missing']), 0) - self.assertEqual(len(result['running']), 0) - - # make more requests - stored = self.make_many_requests(num_requests) - time.sleep(0.5) - # there should be 2 left! - resp = self.query('ps', completed=True, active=True).json() - result = self.assertIsResult(resp) - self.assertEqual(len(result['rows']), 2) - class CompletingServerProcess(ServerProcess): def _compare_result(self, result): @@ -1155,6 +1119,7 @@ def test_deps_compilation_postgres(self): self._check_deps_ok(status) + @mark.skip(reason='cli_args + deps not supported for now') @use_profile('postgres') def test_deps_cli_compilation_postgres(self): status = self._check_start_predeps() @@ -1163,50 +1128,3 @@ def test_deps_cli_compilation_postgres(self): self.assertIsResult(self.async_query('cli_args', cli='deps', _poll_timeout=120).json()) self._check_deps_ok(status) - - -class FailedServerProcess(ServerProcess): - def _compare_result(self, result): - return result['result']['status'] == 'error' - - -@mark.flaky(rerun_filter=addr_in_use) -class TestRPCServerFailed(HasRPCServer): - ServerProcess = FailedServerProcess - should_seed = False - - @property - def models(self): - return "malformed_models" - - def tearDown(self): - # prevent an OperationalError where the server closes on us in the - # background - self.adapter.cleanup_connections() - super().tearDown() - - @use_profile('postgres') - def test_postgres_status_error(self): - status = self.assertIsResult(self.query('status').json()) - self.assertEqual(status['status'], 'error') - self.assertIn('logs', status) - logs = status['logs'] - self.assertTrue(len(logs) > 0) - for key in ('message', 'timestamp', 'levelname', 'level'): - self.assertIn(key, logs[0]) - self.assertIn('pid', status) - self.assertEqual(self._server.pid, status['pid']) - self.assertIn('error', status) - self.assertIn('message', status['error']) - - compile_result = self.query('compile_sql', 'select 1 as id').json() - data = self.assertIsErrorWith( - compile_result, - 10011, - 'RPC server failed to compile project, call the "status" method for compile status', - None) - self.assertIn('message', data) - self.assertIn('Invalid test config', str(data['message'])) - - # deps should work - self.assertIsResult(self.async_query('deps', _poll_timeout=120).json()) diff --git a/test/rpc/__init__.py b/test/rpc/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/test/rpc/fixtures.py b/test/rpc/fixtures.py new file mode 100644 index 00000000000..a58a9f4dfb7 --- /dev/null +++ b/test/rpc/fixtures.py @@ -0,0 +1,533 @@ +import base64 +import json +import os +import pytest +import random +import signal +import socket +import time +from contextlib import contextmanager +from typing import Dict, Any, Optional, Union, List + +import requests +import yaml + +import dbt.flags +from dbt.adapters.factory import get_adapter, register_adapter +from dbt.logger import log_manager +from dbt.main import handle_and_check +from dbt.config import RuntimeConfig + + +def query_url(url, query: Dict[str, Any]): + headers = {'content-type': 'application/json'} + return requests.post(url, headers=headers, data=json.dumps(query)) + + +class NoServerException(Exception): + pass + + +class ServerProcess(dbt.flags.MP_CONTEXT.Process): + def __init__(self, cwd, port, profiles_dir, cli_vars=None, criteria=('ready',)): + self.cwd = cwd + self.port = port + self.criteria = criteria + self.error = None + handle_and_check_args = [ + '--strict', 'rpc', '--log-cache-events', + '--port', str(self.port), + '--profiles-dir', profiles_dir + ] + if cli_vars: + handle_and_check_args.extend(['--vars', cli_vars]) + super().__init__( + target=handle_and_check, + args=(handle_and_check_args,), + name='ServerProcess') + + def run(self): + os.chdir(self.cwd) + log_manager.reset_handlers() + # run server tests in stderr mode + log_manager.stderr_console() + return super().run() + + def can_connect(self): + sock = socket.socket() + try: + sock.connect(('localhost', self.port)) + except socket.error: + return False + sock.close() + return True + + def _compare_result(self, result): + return result['result']['status'] in self.criteria + + def status_ok(self): + result = self.query( + {'method': 'status', 'id': 1, 'jsonrpc': '2.0'} + ).json() + return self._compare_result(result) + + def is_up(self): + if not self.can_connect(): + return False + return self.status_ok() + + def start(self): + super().start() + for _ in range(30): + if self.is_up(): + break + time.sleep(0.5) + if not self.can_connect(): + raise NoServerException('server never appeared!') + status_result = self.query( + {'method': 'status', 'id': 1, 'jsonrpc': '2.0'} + ).json() + if not self._compare_result(status_result): + raise NoServerException( + 'Got invalid status result: {}'.format(status_result) + ) + + @property + def url(self): + return 'http://localhost:{}/jsonrpc'.format(self.port) + + def query(self, query): + headers = {'content-type': 'application/json'} + return requests.post(self.url, headers=headers, data=json.dumps(query)) + + +class Querier: + def __init__(self, server: ServerProcess): + self.server = server + + def build_request_data(self, method, params, request_id): + return { + 'jsonrpc': '2.0', + 'method': method, + 'params': params, + 'id': request_id, + } + + def request(self, method, params=None, request_id=1): + if params is None: + params = {} + + data = self.build_request_data( + method=method, params=params, request_id=request_id + ) + response = self.server.query(data) + assert response.ok, f'invalid response from server: {response.text}' + return response.json() + + def status(self, request_id: int = 1): + return self.request(method='status', request_id=request_id) + + def ps(self, active=True, completed=False, request_id=1): + params = {} + if active is not None: + params['active'] = active + if completed is not None: + params['completed'] = completed + + return self.request(method='ps', params=params, request_id=request_id) + + def kill(self, task_id: str, request_id: int = 1): + params = {'task_id': task_id} + return self.request( + method='kill', params=params, request_id=request_id + ) + + def poll( + self, + request_token: str, + logs: bool = False, + logs_start: int = 0, + request_id: int = 1, + ): + params = { + 'request_token': request_token, + } + if logs is not None: + params['logs'] = logs + if logs_start is not None: + params['logs_start'] = logs_start + return self.request( + method='poll', params=params, request_id=request_id + ) + + def gc( + self, + task_ids: Optional[List[str]] = None, + before: Optional[str] = None, + settings: Optional[Dict[str, Any]] = None, + request_id: int = 1, + ): + params = {} + if task_ids is not None: + params['task_ids'] = task_ids + if before is not None: + params['before'] = before + if settings is not None: + params['settings'] = settings + return self.request( + method='gc', params=params, request_id=request_id + ) + + def cli_args(self, cli: str, request_id: int = 1): + return self.request( + method='cli_args', params={'cli': cli}, request_id=request_id + ) + + def deps(self, request_id: int = 1): + return self.request(method='deps', request_id=request_id) + + def compile( + self, + models: Optional[Union[str, List[str]]] = None, + exclude: Optional[Union[str, List[str]]] = None, + request_id: int = 1, + ): + params = {} + if models is not None: + params['models'] = models + if exclude is not None: + params['exclude'] = exclude + return self.request( + method='compile', params=params, request_id=request_id + ) + + def run( + self, + models: Optional[Union[str, List[str]]] = None, + exclude: Optional[Union[str, List[str]]] = None, + request_id: int = 1, + ): + params = {} + if models is not None: + params['models'] = models + if exclude is not None: + params['exclude'] = exclude + return self.request( + method='run', params=params, request_id=request_id + ) + + def seed(self, show: bool = None, request_id: int = 1): + params = {} + if show is not None: + params['show'] = show + return self.request( + method='seed', params=params, request_id=request_id + ) + + def test( + self, + models: Optional[Union[str, List[str]]] = None, + exclude: Optional[Union[str, List[str]]] = None, + data: bool = None, + schema: bool = None, + request_id: int = 1, + ): + params = {} + if models is not None: + params['models'] = models + if exclude is not None: + params['exclude'] = exclude + if data is not None: + params['data'] = data + if schema is not None: + params['schema'] = schema + return self.requuest( + method='test', params=params, request_id=request_id + ) + + def docs_generate(self, compile: bool = None, request_id: int = 1): + params = {} + if compile is not None: + params['compile'] = True + return self.request( + method='docs.generate', params=params, request_id=request_id + ) + + def compile_sql( + self, + sql: str, + name: str = 'test_compile', + macros: Optional[str] = None, + request_id: int = 1, + ): + sql = base64.b64encode(sql.encode('utf-8')).decode('utf-8') + params = { + 'name': name, + 'sql': sql, + 'macros': macros, + } + return self.request( + method='compile_sql', params=params, request_id=request_id + ) + + def run_sql( + self, + sql: str, + name: str = 'test_run', + macros: Optional[str] = None, + request_id: int = 1, + ): + sql = base64.b64encode(sql.encode('utf-8')).decode('utf-8') + params = { + 'name': name, + 'sql': sql, + 'macros': macros, + } + return self.request( + method='run_sql', params=params, request_id=request_id + ) + + def is_result(self, data: Dict[str, Any], id=None) -> Dict[str, Any]: + if id is not None: + assert data['id'] == id + assert data['jsonrpc'] == '2.0' + assert 'result' in data + assert 'error' not in data + return data['result'] + + def is_async_result(self, data: Dict[str, Any], id=None) -> str: + result = self.is_result(data, id) + assert 'request_token' in result + return result['request_token'] + + def is_error(self, data: Dict[str, Any], id=None) -> Dict[str, Any]: + if id is not None: + assert data['id'] == id + assert data['jsonrpc'] == '2.0' + assert 'result' not in data + assert 'error' in data + return data['error'] + + def async_wait(self, token: str, timeout: int = 60, status='success') -> Dict[str, Any]: + start = time.time() + while True: + time.sleep(0.5) + response = self.poll(token) + if 'error' in response: + return response + result = self.is_result(response) + assert 'status' in result + if result['status'] == status: + return response + delta = (time.time() - start) + assert timeout > delta, \ + f'At time {delta}, never saw {status}.\nLast response: {result}' + + +def _first_server(cwd, cli_vars, profiles_dir, criteria): + stored = None + for _ in range(5): + port = random.randint(20000, 65535) + + proc = ServerProcess( + cwd=cwd, + cli_vars=cli_vars, + profiles_dir=str(profiles_dir), + port=port, + criteria=criteria, + ) + try: + proc.start() + except NoServerException as exc: + stored = exc + else: + return proc + if stored: + raise stored + + +@contextmanager +def rpc_server(project_dir, schema, profiles_dir, criteria='ready'): + if isinstance(criteria, str): + criteria = (criteria,) + else: + criteria = tuple(criteria) + + cli_vars = '{{test_run_schema: {}}}'.format(schema) + + proc = _first_server(project_dir, cli_vars, profiles_dir, criteria) + yield proc + if proc.is_alive(): + os.kill(proc.pid, signal.SIGKILL) + proc.join() + + +@pytest.fixture +def unique_schema() -> str: + return "test{}{:04}".format(int(time.time()), random.randint(0, 9999)) + + +@pytest.fixture +def profiles_dir(tmpdir): + return tmpdir.mkdir('profile') + + +@pytest.fixture +def postgres_profile_data(unique_schema): + return { + 'config': { + 'send_anonymous_usage_stats': False + }, + 'test': { + 'outputs': { + 'default': { + 'type': 'postgres', + 'threads': 4, + 'host': 'database', + 'port': 5432, + 'user': 'root', + 'pass': 'password', + 'dbname': 'dbt', + 'schema': unique_schema, + }, + }, + 'target': 'default' + } + } + + +@pytest.fixture +def postgres_profile(profiles_dir, postgres_profile_data) -> Dict[str, Any]: + path = os.path.join(profiles_dir, 'profiles.yml') + with open(path, 'w') as fp: + fp.write(yaml.safe_dump(postgres_profile_data)) + return postgres_profile_data + + +@pytest.fixture +def project_dir(tmpdir): + return tmpdir.mkdir('project') + + +class ProjectDefinition: + def __init__( + self, + name='test', + version='0.1.0', + profile='test', + project_data=None, + packages=None, + models=None, + macros=None, + ): + self.project = { + 'name': name, + 'version': version, + 'profile': profile, + } + if project_data: + self.project.update(project_data) + self.packages = packages + self.models = models + self.macros = macros + + def _write_recursive(self, path, inputs): + for name, value in inputs.items(): + if name.endswith('.sql'): + path.join(name).write(value) + elif name.endswith('.yml'): + if isinstance(value, str): + data = value + else: + data = yaml.safe_dump(value) + path.join(name).write(data) + else: + self._write_recursive(path.mkdir(name), value) + + def write_packages(self, project_dir, remove=False): + if remove: + project_dir.join('packages.yml').remove() + if self.packages is not None: + if isinstance(self.packages, str): + data = self.packages + else: + data = yaml.safe_dump(self.packages) + project_dir.join('packages.yml').write(data) + + def write_config(self, project_dir, remove=False): + cfg = project_dir.join('dbt_project.yml') + if remove: + cfg.remove() + cfg.write(yaml.safe_dump(self.project)) + + def write_models(self, project_dir, remove=False): + if remove: + project_dir.join('models').remove() + + if self.models is not None: + self._write_recursive(project_dir.mkdir('models'), self.models) + + def write_macros(self, project_dir, remove=False): + if remove: + project_dir.join('macros').remove() + + if self.macros is not None: + self._write_recursive(project_dir.mkdir('macros'), self.macros) + + def write_to(self, project_dir, remove=False): + if remove: + project_dir.remove() + project_dir.mkdir() + self.write_packages(project_dir) + self.write_config(project_dir) + self.write_models(project_dir) + self.write_macros(project_dir) + + +class TestArgs: + def __init__(self, profiles_dir, which='run-operation', kwargs={}): + self.which = which + self.single_threaded = False + self.profiles_dir = profiles_dir + self.profile = None + self.target = None + self.__dict__.update(kwargs) + + +def execute(adapter, sql): + with adapter.connection_named('rpc-tests') as conn: + with conn.handle.cursor() as cursor: + try: + cursor.execute(sql) + conn.handle.commit() + + except Exception as e: + if conn.handle and conn.handle.closed == 0: + conn.handle.rollback() + print(sql) + print(e) + raise + finally: + conn.transaction_open = False + + +@contextmanager +def built_schema(project_dir, schema, profiles_dir, test_kwargs, project_def): + # make our args, write our project out + args = TestArgs(profiles_dir=profiles_dir, kwargs=test_kwargs) + project_def.write_to(project_dir) + # build a config of our own + os.chdir(project_dir) + start = os.getcwd() + try: + cfg = RuntimeConfig.from_args(args) + finally: + os.chdir(start) + register_adapter(cfg) + adapter = get_adapter(cfg) + execute(adapter, 'drop schema if exists {} cascade'.format(schema)) + execute(adapter, 'create schema {}'.format(schema)) + yield + adapter = get_adapter(cfg) + adapter.cleanup_connections() + execute(adapter, 'drop schema if exists {} cascade'.format(schema)) diff --git a/test/rpc/test_base.py b/test/rpc/test_base.py new file mode 100644 index 00000000000..69796ded5fe --- /dev/null +++ b/test/rpc/test_base.py @@ -0,0 +1,253 @@ +import time +from .fixtures import ( + ProjectDefinition, rpc_server, Querier, project_dir, profiles_dir, + postgres_profile, unique_schema, postgres_profile_data, built_schema, +) + + +def test_rpc_basics(project_dir, profiles_dir, postgres_profile, unique_schema): + project = ProjectDefinition( + models={'my_model.sql': 'select 1 as id'} + ) + server_ctx = rpc_server( + project_dir=project_dir, schema=unique_schema, profiles_dir=profiles_dir + ) + schema_ctx = built_schema( + project_dir=project_dir, schema=unique_schema, profiles_dir=profiles_dir, test_kwargs={}, project_def=project, + ) + with schema_ctx, server_ctx as server: + querier = Querier(server) + + token = querier.is_async_result(querier.run_sql('select 1 as id')) + querier.is_result(querier.async_wait(token)) + + token = querier.is_async_result(querier.run()) + querier.is_result(querier.async_wait(token)) + + token = querier.is_async_result(querier.run_sql('select * from {{ ref("my_model") }}')) + querier.is_result(querier.async_wait(token)) + + token = querier.is_async_result(querier.run_sql('select * from {{ reff("my_model") }}')) + querier.is_error(querier.async_wait(token)) + + +def test_rpc_deps(project_dir, profiles_dir, postgres_profile, unique_schema): + project = ProjectDefinition( + models={ + 'my_model.sql': 'select 1 as id', + }, + packages={ + 'packages': [ + { + 'package': 'fishtown-analytics/dbt_utils', + 'version': '0.2.1', + }, + ], + }, + ) + server_ctx = rpc_server( + project_dir=project_dir, schema=unique_schema, profiles_dir=profiles_dir + ) + schema_ctx = built_schema( + project_dir=project_dir, schema=unique_schema, profiles_dir=profiles_dir, test_kwargs={}, project_def=project, + ) + with schema_ctx, server_ctx as server: + querier = Querier(server) + + # we should be able to run sql queries at startup + token = querier.is_async_result(querier.run_sql('select 1 as id')) + querier.is_result(querier.async_wait(token)) + + # the status should be something positive + querier.is_result(querier.status()) + + # deps should pass + token = querier.is_async_result(querier.deps()) + querier.is_result(querier.async_wait(token)) + + # queries should work after deps + tok1 = querier.is_async_result(querier.run()) + tok2 = querier.is_async_result(querier.run_sql('select 1 as id')) + + querier.is_result(querier.async_wait(tok2)) + querier.is_result(querier.async_wait(tok1)) + + # now break the project by giving an invalid URL (`dbt_util` instead of `utils`) + project.packages['packages'][0]['package'] = 'fishtown-analytics/dbt_util' + project.write_packages(project_dir, remove=True) + + # queries should still work because we haven't reloaded + tok1 = querier.is_async_result(querier.run()) + tok2 = querier.is_async_result(querier.run_sql('select 1 as id')) + + querier.is_result(querier.async_wait(tok2)) + querier.is_result(querier.async_wait(tok1)) + + # now run deps again, it should be sad + token = querier.is_async_result(querier.deps()) + querier.is_error(querier.async_wait(token)) + # it should also not be running. + result = querier.is_result(querier.ps(active=True, completed=False)) + assert result['rows'] == [] + + # fix packages again + project.packages['packages'][0]['package'] = 'fishtown-analytics/dbt_utils' + project.write_packages(project_dir, remove=True) + # keep queries broken, we haven't run deps yet + querier.is_error(querier.run()) + + # deps should pass now + token = querier.is_async_result(querier.deps()) + querier.is_result(querier.async_wait(token)) + querier.is_result(querier.status()) + + tok1 = querier.is_async_result(querier.run()) + tok2 = querier.is_async_result(querier.run_sql('select 1 as id')) + + querier.is_result(querier.async_wait(tok2)) + querier.is_result(querier.async_wait(tok1)) + + +bad_schema_yml = ''' +version: 2 +sources: + - name: test_source + loader: custom + schema: "{{ var('test_run_schema') }}" + tables: + - name: test_table + identifier: source + tests: + - relationships: + # this is invalid + - column_name: favorite_color + - to: ref('descendant_model') + - field: favorite_color +''' + +fixed_schema_yml = ''' +version: 2 +sources: + - name: test_source + loader: custom + schema: "{{ var('test_run_schema') }}" + tables: + - name: test_table + identifier: source +''' + + +def test_rpc_status_error(project_dir, profiles_dir, postgres_profile, unique_schema): + project = ProjectDefinition( + models={ + 'descendant_model.sql': 'select * from {{ source("test_source", "test_table") }}', + 'schema.yml': bad_schema_yml, + } + ) + server_ctx = rpc_server( + project_dir=project_dir, schema=unique_schema, profiles_dir=profiles_dir, criteria='error', + ) + schema_ctx = built_schema( + project_dir=project_dir, schema=unique_schema, profiles_dir=profiles_dir, test_kwargs={}, project_def=project, + ) + with schema_ctx, server_ctx as server: + querier = Querier(server) + + # the status should be an error result + result = querier.is_result(querier.status()) + assert 'error' in result + assert 'message' in result['error'] + assert 'Invalid test config' in result['error']['message'] + assert 'status' in result + assert result['status'] == 'error' + assert 'logs' in result + logs = result['logs'] + assert len(logs) > 0 + for key in ('message', 'timestamp', 'levelname', 'level'): + assert key in logs[0] + assert 'pid' in result + assert server.pid == result['pid'] + + error = querier.is_error(querier.compile_sql('select 1 as id')) + assert 'code' in error + assert error['code'] == 10011 + assert 'message' in error + assert error['message'] == 'RPC server failed to compile project, call the "status" method for compile status' + assert 'data' in error + assert 'message' in error['data'] + assert 'Invalid test config' in error['data']['message'] + + # deps should fail because it still can't parse the manifest + token = querier.is_async_result(querier.deps()) + querier.is_error(querier.async_wait(token)) + + # and not resolve the issue + result = querier.is_result(querier.status()) + assert 'error' in result + assert 'message' in result['error'] + assert 'Invalid test config' in result['error']['message'] + + error = querier.is_error(querier.compile_sql('select 1 as id')) + assert 'code' in error + assert error['code'] == 10011 + + project.models['schema.yml'] = fixed_schema_yml + project.write_models(project_dir, remove=True) + + # deps should work + token = querier.is_async_result(querier.deps()) + querier.is_result(querier.async_wait(token)) + + result = querier.is_result(querier.status()) + assert result.get('error') is None + assert 'status' in result + assert result['status'] == 'ready' + + querier.is_result(querier.compile_sql('select 1 as id')) + + +def test_gc_change_interval(project_dir, profiles_dir, postgres_profile, unique_schema): + project = ProjectDefinition( + models={'my_model.sql': 'select 1 as id'} + ) + server_ctx = rpc_server( + project_dir=project_dir, schema=unique_schema, profiles_dir=profiles_dir + ) + schema_ctx = built_schema( + project_dir=project_dir, schema=unique_schema, profiles_dir=profiles_dir, test_kwargs={}, project_def=project, + ) + with schema_ctx, server_ctx as server: + querier = Querier(server) + + for _ in range(10): + token = querier.is_async_result(querier.run()) + querier.is_result(querier.async_wait(token)) + + result = querier.is_result(querier.ps(True, True)) + assert len(result['rows']) == 10 + + result = querier.is_result(querier.gc(settings=dict(maxsize=1000, reapsize=5, auto_reap_age=0.1))) + + for k in ('deleted', 'missing', 'running'): + assert k in result + assert len(result[k]) == 0 + + time.sleep(0.5) + + result = querier.is_result(querier.ps(True, True)) + assert len(result['rows']) == 0 + + result = querier.is_result(querier.gc(settings=dict(maxsize=2, reapsize=5, auto_reap_age=100000))) + for k in ('deleted', 'missing', 'running'): + assert k in result + assert len(result[k]) == 0 + + time.sleep(0.5) + + for _ in range(10): + token = querier.is_async_result(querier.run()) + querier.is_result(querier.async_wait(token)) + + time.sleep(0.5) + result = querier.is_result(querier.ps(True, True)) + assert len(result['rows']) == 2 diff --git a/tox.ini b/tox.ini index 83eb19cb69b..50e570b051b 100644 --- a/tox.ini +++ b/tox.ini @@ -30,11 +30,13 @@ passenv = * setenv = HOME=/home/tox commands = /bin/bash -c '{envpython} -m pytest --durations 0 -v -m profile_postgres {posargs} -n4 test/integration/*' + /bin/bash -c '{envpython} -m pytest --durations 0 -v {posargs} -n4 test/rpc/*' deps = -e {toxinidir}/core -e {toxinidir}/plugins/postgres -r{toxinidir}/dev_requirements.txt + [testenv:integration-snowflake-py36] basepython = python3.6 passenv = * @@ -102,7 +104,8 @@ basepython = python3.7 passenv = * setenv = HOME=/home/tox -commands = /bin/bash -c '{envpython} -m pytest --durations 0 -v -m profile_postgres {posargs} -n4 test/integration/*' +commands = /bin/bash -c '{envpython} -m pytest --durations 0 -v -m profile_postgres {posargs} -n4 test/integration/*' && \ + /bin/bash -c '{envpython} -m pytest --durations 0 -v {posargs} -n4 test/rpc/*' deps = -e {toxinidir}/core -e {toxinidir}/plugins/postgres