Skip to content

Commit

Permalink
Merge pull request #1699 from fishtown-analytics/feature/sighup-reloa…
Browse files Browse the repository at this point in the history
…d-manifest

reload RPC server manifest on SIGHUP (#1684)
  • Loading branch information
beckjake authored Aug 27, 2019
2 parents eb8bce4 + bd63aac commit fe48478
Show file tree
Hide file tree
Showing 28 changed files with 1,416 additions and 735 deletions.
10 changes: 6 additions & 4 deletions core/dbt/contracts/connection.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from hologram.helpers import StrEnum, NewPatternType, ExtensibleJsonSchemaMixin
from hologram.helpers import (
StrEnum, register_pattern, ExtensibleJsonSchemaMixin
)
from hologram import JsonSchemaMixin
from dbt.contracts.util import Replaceable

from dataclasses import dataclass
from typing import Any, Optional
from typing import Any, Optional, NewType


Identifier = NewPatternType('Identifier', r'^[A-Za-z_][A-Za-z0-9_]+$')
Identifier = NewType('Identifier', str)
register_pattern(Identifier, r'^[A-Za-z_][A-Za-z0-9_]+$')


class ConnectionState(StrEnum):
Expand Down
9 changes: 6 additions & 3 deletions core/dbt/contracts/graph/parsed.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from dataclasses import dataclass, field
from typing import Optional, Union, List, Dict, Any, Type, Tuple
from typing import Optional, Union, List, Dict, Any, Type, Tuple, NewType

from hologram import JsonSchemaMixin
from hologram.helpers import StrEnum, NewPatternType, ExtensibleJsonSchemaMixin
from hologram.helpers import (
StrEnum, register_pattern, ExtensibleJsonSchemaMixin
)

import dbt.clients.jinja
import dbt.flags
Expand Down Expand Up @@ -46,7 +48,8 @@ def insensitive_patterns(*patterns: str):
return '^({})$'.format('|'.join(lowercased))


Severity = NewPatternType('Severity', insensitive_patterns('warn', 'error'))
Severity = NewType('Severity', str)
register_pattern(Severity, insensitive_patterns('warn', 'error'))


@dataclass
Expand Down
12 changes: 7 additions & 5 deletions core/dbt/contracts/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,26 @@
# from dbt.utils import JSONEncoder

from hologram import JsonSchemaMixin
from hologram.helpers import HyphenatedJsonSchemaMixin, NewPatternType, \
from hologram.helpers import HyphenatedJsonSchemaMixin, register_pattern, \
ExtensibleJsonSchemaMixin

from dataclasses import dataclass, field
from typing import Optional, List, Dict, Union, Any
from typing import Optional, List, Dict, Union, Any, NewType

PIN_PACKAGE_URL = 'https://docs.getdbt.com/docs/package-management#section-specifying-package-versions' # noqa
DEFAULT_SEND_ANONYMOUS_USAGE_STATS = True
DEFAULT_USE_COLORS = True


Name = NewPatternType('Name', r'^[^\d\W]\w*\Z')
Name = NewType('Name', str)
register_pattern(Name, r'^[^\d\W]\w*\Z')

# this does not support the full semver (does not allow a trailing -fooXYZ) and
# is not restrictive enough for full semver, (allows '1.0'). But it's like
# 'semver lite'.
SemverString = NewPatternType(
'SemverString',
SemverString = NewType('SemverString', str)
register_pattern(
SemverString,
r'^(?:0|[1-9]\d*)\.(?:0|[1-9]\d*)(\.(?:0|[1-9]\d*))?$',
)

Expand Down
55 changes: 40 additions & 15 deletions core/dbt/deprecations.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,31 @@
from typing import Optional, Set, List, Dict, ClassVar

import dbt.links
import dbt.exceptions
import dbt.flags


class DBTDeprecation:
name = None
description = None
_name: ClassVar[Optional[str]] = None
_description: ClassVar[Optional[str]] = None

@property
def name(self) -> str:
if self._name is not None:
return self._name
raise NotImplementedError(
'name not implemented for {}'.format(self)
)

@property
def description(self) -> str:
if self._description is not None:
return self._description
raise NotImplementedError(
'description not implemented for {}'.format(self)
)

def show(self, *args, **kwargs):
def show(self, *args, **kwargs) -> None:
if self.name not in active_deprecations:
desc = self.description.format(**kwargs)
dbt.exceptions.warn_or_error(
Expand All @@ -16,8 +35,10 @@ def show(self, *args, **kwargs):


class DBTRepositoriesDeprecation(DBTDeprecation):
name = "repositories"
description = """The dbt_project.yml configuration option 'repositories' is
_name = "repositories"

_description = """
The dbt_project.yml configuration option 'repositories' is
deprecated. Please place dependencies in the `packages.yml` file instead.
The 'repositories' option will be removed in a future version of dbt.
Expand All @@ -26,12 +47,13 @@ class DBTRepositoriesDeprecation(DBTDeprecation):
# Example packages.yml contents:
{recommendation}
"""
""".lstrip()


class GenerateSchemaNameSingleArgDeprecated(DBTDeprecation):
name = 'generate-schema-name-single-arg'
description = '''As of dbt v0.14.0, the `generate_schema_name` macro
_name = 'generate-schema-name-single-arg'

_description = '''As of dbt v0.14.0, the `generate_schema_name` macro
accepts a second "node" argument. The one-argument form of `generate_schema_name`
is deprecated, and will become unsupported in a future release.
Expand All @@ -47,11 +69,12 @@ class GenerateSchemaNameSingleArgDeprecated(DBTDeprecation):
https://docs.getdbt.com/docs/adapter"""


def renamed_method(old_name, new_name):
def renamed_method(old_name: str, new_name: str):

class AdapterDeprecationWarning(DBTDeprecation):
name = 'adapter:{}'.format(old_name)
description = _adapter_renamed_description.format(old_name=old_name,
new_name=new_name)
_name = 'adapter:{}'.format(old_name)
_description = _adapter_renamed_description.format(old_name=old_name,
new_name=new_name)

dep = AdapterDeprecationWarning()
deprecations_list.append(dep)
Expand All @@ -71,14 +94,16 @@ def warn(name, *args, **kwargs):
# these are globally available
# since modules are only imported once, active_deprecations is a singleton

active_deprecations = set()
active_deprecations: Set[str] = set()

deprecations_list = [
deprecations_list: List[DBTDeprecation] = [
DBTRepositoriesDeprecation(),
GenerateSchemaNameSingleArgDeprecated(),
]

deprecations = {d.name: d for d in deprecations_list}
deprecations: Dict[str, DBTDeprecation] = {
d.name: d for d in deprecations_list
}


def reset_deprecations():
Expand Down
27 changes: 27 additions & 0 deletions core/dbt/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,33 @@ def data(self):
}


class RPCCompiling(RuntimeException):
CODE = 10010
MESSAGE = (
'RPC server is compiling the project, call the "status" method for'
' compile status'
)


class RPCLoadException(RuntimeException):
CODE = 10011
MESSAGE = (
'RPC server failed to compile project, call the "status" method for'
' compile status'
)

def __init__(self, cause):
self.cause = cause
self.message = '{}: {}'.format(self.MESSAGE, self.cause['message'])
super().__init__(self.message)

def data(self):
return {
'cause': self.cause,
'message': self.message
}


class DatabaseException(RuntimeException):
CODE = 10003
MESSAGE = "Database Error"
Expand Down
94 changes: 87 additions & 7 deletions core/dbt/rpc.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from hologram import JsonSchemaMixin
from hologram.helpers import StrEnum
from jsonrpc.exceptions import \
JSONRPCDispatchException, \
JSONRPCInvalidParams, \
Expand All @@ -8,7 +10,10 @@
from jsonrpc.jsonrpc import JSONRPCRequest
from jsonrpc.jsonrpc2 import JSONRPC20Response

from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any, Dict, Optional
import json
import multiprocessing
import os
Expand Down Expand Up @@ -139,6 +144,8 @@ def __getitem__(self, key):
return func

task = self.manager.rpc_task(key)
if task is None:
raise KeyError(key)
return self.rpc_factory(task)


Expand Down Expand Up @@ -319,22 +326,75 @@ def state(self):
)


class ManifestStatus(StrEnum):
Init = 'init'
Compiling = 'compiling'
Ready = 'ready'
Error = 'error'


@dataclass
class LastCompile(JsonSchemaMixin):
status: ManifestStatus
error: Optional[Dict[str, Any]] = None
timestamp: datetime = field(default_factory=datetime.utcnow)


class TaskManager:
def __init__(self):
def __init__(self, args, config):
self.args = args
self.config = config
self.tasks = {}
self.completed = {}
self._rpc_task_map = {}
self._rpc_function_map = {}
self._last_compile = LastCompile(status=ManifestStatus.Init)
self._lock = multiprocessing.Lock()

def add_request(self, request_handler):
self.tasks[request_handler.task_id] = request_handler

def add_task_handler(self, task):
self._rpc_task_map[task.METHOD_NAME] = task
def reserve_handler(self, task):
self._rpc_task_map[task.METHOD_NAME] = None

def add_task_handler(self, task, manifest):
self._rpc_task_map[task.METHOD_NAME] = task(
self.args, self.config, manifest
)

def rpc_task(self, method_name):
return self._rpc_task_map[method_name]
with self._lock:
return self._rpc_task_map[method_name]

def ready(self):
with self._lock:
return self._last_compile.status == ManifestStatus.Ready

def set_compiling(self):
assert self._last_compile.status != ManifestStatus.Compiling, \
f'invalid state {self._last_compile.status}'
with self._lock:
self._last_compile = LastCompile(status=ManifestStatus.Compiling)

def set_compile_exception(self, exc):
assert self._last_compile.status == ManifestStatus.Compiling, \
f'invalid state {self._last_compile.status}'
self._last_compile = LastCompile(
error={'message': str(exc)},
status=ManifestStatus.Error
)

def set_ready(self):
assert self._last_compile.status == ManifestStatus.Compiling, \
f'invalid state {self._last_compile.status}'
self._last_compile = LastCompile(status=ManifestStatus.Ready)

def process_status(self):
with self._lock:
last_compile = self._last_compile

status = last_compile.to_dict()
status['pid'] = os.getpid()
return status

def process_listing(self, active=True, completed=False):
included_tasks = {}
Expand Down Expand Up @@ -395,11 +455,26 @@ def process_kill(self, task_id):
result['finished'] = True
return result

def process_currently_compiling(self, *args, **kwargs):
raise dbt_error(dbt.exceptions.RPCCompiling('compile in progress'))

def process_compilation_error(self, *args, **kwargs):
raise dbt_error(
dbt.exceptions.RPCLoadException(self._last_compile.error)
)

def rpc_builtin(self, method_name):
if method_name == 'ps':
return self.process_listing
if method_name == 'kill' and os.name != 'nt':
return self.process_kill
if method_name == 'status':
return self.process_status
if method_name in self._rpc_task_map:
if self._last_compile.status == ManifestStatus.Compiling:
return self.process_currently_compiling
if self._last_compile.status == ManifestStatus.Error:
return self.process_compilation_error
return None

def mark_done(self, request_handler):
Expand All @@ -411,10 +486,14 @@ def mark_done(self, request_handler):
self.completed[task_id] = self.tasks.pop(task_id)

def methods(self):
rpc_builtin_methods = ['ps']
rpc_builtin_methods = ['ps', 'status']
if os.name != 'nt':
rpc_builtin_methods.append('kill')
return list(self._rpc_task_map) + rpc_builtin_methods

with self._lock:
task_map = list(self._rpc_task_map)

return task_map + rpc_builtin_methods


class ResponseManager(JSONRPCResponseManager):
Expand All @@ -440,6 +519,7 @@ def handle(cls, http_request, task_manager):
return JSONRPC20Response(error=JSONRPCInvalidRequest()._data)

track_rpc_request(request.method)

dispatcher = RequestDispatcher(
http_request,
request,
Expand Down
Loading

0 comments on commit fe48478

Please sign in to comment.