Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RPC server (#1274) #1301

Merged
merged 8 commits into from
Mar 5, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions core/dbt/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def link_graph(self, linker, manifest):
if cycle:
raise RuntimeError("Found a cycle: {}".format(cycle))

def compile(self, manifest):
def compile(self, manifest, write=True):
linker = Linker()

self.link_graph(linker, manifest)
Expand All @@ -195,25 +195,35 @@ def compile(self, manifest):
manifest.macros.items()):
stats[node.resource_type] += 1

self.write_graph_file(linker, manifest)
if write:
self.write_graph_file(linker, manifest)
print_compile_stats(stats)

return linker


def compile_manifest(config, manifest):
def compile_manifest(config, manifest, write=True):
compiler = Compiler(config)
compiler.initialize()
return compiler.compile(manifest)
return compiler.compile(manifest, write=write)


def compile_node(adapter, config, node, manifest, extra_context):
def _is_writable(node):
if not node.injected_sql:
return False

if dbt.utils.is_type(node, NodeType.Archive):
drewbanin marked this conversation as resolved.
Show resolved Hide resolved
return False

return True


def compile_node(adapter, config, node, manifest, extra_context, write=True):
compiler = Compiler(config)
node = compiler.compile_node(node, manifest, extra_context)
node = _inject_runtime_config(adapter, node, extra_context)

if(node.injected_sql is not None and
not (dbt.utils.is_type(node, NodeType.Archive))):
if write and _is_writable(node):
logger.debug('Writing injected SQL for node "{}"'.format(
node.unique_id))

Expand Down
19 changes: 1 addition & 18 deletions core/dbt/config/__init__.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,5 @@

from .renderer import ConfigRenderer
from .profile import Profile, UserConfig
from .profile import Profile, UserConfig, PROFILES_DIR
from .project import Project
from .profile import read_profile
from .profile import PROFILES_DIR
from .runtime import RuntimeConfig


def read_profiles(profiles_dir=None):
"""This is only used in main, for some error handling"""
if profiles_dir is None:
profiles_dir = PROFILES_DIR

raw_profiles = read_profile(profiles_dir)

if raw_profiles is None:
profiles = {}
else:
profiles = {k: v for (k, v) in raw_profiles.items() if k != 'config'}

return profiles
8 changes: 2 additions & 6 deletions core/dbt/config/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,14 +335,12 @@ def from_raw_profiles(cls, raw_profiles, profile_name, cli_vars,
)

@classmethod
def from_args(cls, args, project_profile_name=None, cli_vars=None):
def from_args(cls, args, project_profile_name=None):
"""Given the raw profiles as read from disk and the name of the desired
profile if specified, return the profile component of the runtime
config.

:param args argparse.Namespace: The arguments as parsed from the cli.
:param cli_vars dict: The command-line variables passed as arguments,
as a dict.
:param project_profile_name Optional[str]: The profile name, if
specified in a project.
:raises DbtProjectError: If there is no profile name specified in the
Expand All @@ -352,9 +350,7 @@ def from_args(cls, args, project_profile_name=None, cli_vars=None):
target could not be found.
:returns Profile: The new Profile object.
"""
if cli_vars is None:
cli_vars = parse_cli_vars(getattr(args, 'vars', '{}'))

cli_vars = parse_cli_vars(getattr(args, 'vars', '{}'))
threads_override = getattr(args, 'threads', None)
target_override = getattr(args, 'target', None)
raw_profiles = read_profile(args.profiles_dir)
Expand Down
4 changes: 4 additions & 0 deletions core/dbt/config/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,10 @@ def from_project_root(cls, project_root, cli_vars):
def from_current_directory(cls, cli_vars):
return cls.from_project_root(os.getcwd(), cli_vars)

@classmethod
def from_args(cls, args):
return cls.from_current_directory(getattr(args, 'vars', '{}'))

def hashed_name(self):
return hashlib.md5(self.project_name.encode('utf-8')).hexdigest()

Expand Down
7 changes: 2 additions & 5 deletions core/dbt/config/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,16 +171,13 @@ def from_args(cls, args):
:raises DbtProfileError: If the profile is invalid or missing.
:raises ValidationException: If the cli variables are invalid.
"""
cli_vars = parse_cli_vars(getattr(args, 'vars', '{}'))

# build the project and read in packages.yml
project = Project.from_current_directory(cli_vars)
project = Project.from_args(args)

# build the profile
profile = Profile.from_args(
args=args,
project_profile_name=project.profile_name,
cli_vars=cli_vars
project_profile_name=project.profile_name
)

return cls.from_parts(
Expand Down
10 changes: 10 additions & 0 deletions core/dbt/contracts/graph/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,3 +409,13 @@ def get_used_schemas(self):

def get_used_databases(self):
return frozenset(node.database for node in self.nodes.values())

def deepcopy(self, config=None):
return Manifest(
nodes={k: v.incorporate() for k, v in self.nodes.items()},
macros={k: v.incorporate() for k, v in self.macros.items()},
docs={k: v.incorporate() for k, v in self.docs.items()},
generated_at=self.generated_at,
disabled=[n.incorporate() for n in self.disabled],
config=config
)
1 change: 1 addition & 0 deletions core/dbt/contracts/graph/unparsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
NodeType.Seed,
# we need this if parse_node is going to handle archives.
NodeType.Archive,
NodeType.RPCCall,
]
},
},
Expand Down
76 changes: 76 additions & 0 deletions core/dbt/contracts/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,3 +458,79 @@ class FreshnessRunOutput(APIObject):

def __init__(self, meta, sources):
super(FreshnessRunOutput, self).__init__(meta=meta, sources=sources)


REMOTE_COMPILE_RESULT_CONTRACT = {
'type': 'object',
'additionalProperties': False,
'properties': {
'raw_sql': {
'type': 'string',
},
'compiled_sql': {
'type': 'string',
},
'timing': {
'type': 'array',
'items': TIMING_INFO_CONTRACT,
},
},
'required': ['raw_sql', 'compiled_sql', 'timing']
}


class RemoteCompileResult(APIObject):
SCHEMA = REMOTE_COMPILE_RESULT_CONTRACT

def __init__(self, raw_sql, compiled_sql, timing=None, **kwargs):
if timing is None:
timing = []
super(RemoteCompileResult, self).__init__(
raw_sql=raw_sql,
compiled_sql=compiled_sql,
timing=timing,
**kwargs
)

@property
def node(self):
return None

@property
def error(self):
return None


REMOTE_RUN_RESULT_CONTRACT = deep_merge(REMOTE_COMPILE_RESULT_CONTRACT, {
'properties': {
'table': {
drewbanin marked this conversation as resolved.
Show resolved Hide resolved
'type': 'object',
'properties': {
'column_names': {
'type': 'array',
'items': {'type': 'string'},
},
'rows': {
'type': 'array',
# any item type is ok
},
},
'required': ['rows', 'column_names'],
},
},
'required': ['table'],
})


class RemoteRunResult(RemoteCompileResult):
SCHEMA = REMOTE_RUN_RESULT_CONTRACT

def __init__(self, raw_sql, compiled_sql, timing=None, table=None):
if table is None:
table = []
super(RemoteRunResult, self).__init__(
raw_sql=raw_sql,
compiled_sql=compiled_sql,
timing=timing,
table=table
)
4 changes: 4 additions & 0 deletions core/dbt/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ class InternalException(Exception):
pass


class RPCException(Exception):
pass


class RuntimeException(RuntimeError, Exception):
def __init__(self, msg, node=None):
self.stack = []
Expand Down
5 changes: 5 additions & 0 deletions core/dbt/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,14 @@
logging.getLogger('google').setLevel(logging.INFO)
logging.getLogger('snowflake.connector').setLevel(logging.INFO)
logging.getLogger('parsedatetime').setLevel(logging.INFO)
# we never want to seek werkzeug logs
logging.getLogger('werkzeug').setLevel(logging.CRITICAL)

# provide this for the cache.
CACHE_LOGGER = logging.getLogger('dbt.cache')
# provide this for RPC connection logging
RPC_LOGGER = logging.getLogger('dbt.rpc')


# Redirect warnings through our logging setup
# They will be logged to a file below
Expand Down
Loading