Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adapters: postgres & redshift #259

Merged
merged 49 commits into from
Jan 20, 2017
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
3145139
initial commit, acquire_connection working for postgres with unit test
Dec 28, 2016
13fe3e5
cleanup strings, add docker unit test harness
Dec 28, 2016
973c4ad
move execute_without_auto_commit to postgres adapter
Dec 28, 2016
f4f0b41
wired in w/ strict mode
Jan 6, 2017
d0a6f17
turn strict mode on for all tests, not just default ones
Jan 6, 2017
32e20a3
add schema to unit test harness
Jan 6, 2017
9a46a2e
merged development
Jan 6, 2017
c612333
back to pep8 compliance
Jan 6, 2017
7023921
again, improve test harness
Jan 6, 2017
e848e59
remove now-unused test scripts
Jan 6, 2017
7076a7d
implement connection sharing w/ test, reset cache on dbt invocation i…
Jan 6, 2017
b2e31c9
move drop into adapter
Jan 7, 2017
0886bc4
add truncate, rename functions & add master exception handler
Jan 9, 2017
31502f8
back to pep-8 compliance
Jan 9, 2017
f6e0813
test harness++
Jan 9, 2017
4a6750e
snowflake integration test (not really working yet)
Jan 9, 2017
5676c34
added snowflake adapter
Jan 10, 2017
dcb8278
tearing out most of schema helper
Jan 10, 2017
7a52b80
schema helper is dead (long live schema helper)
Jan 10, 2017
6251c07
possibly snowflake-ready?
Jan 10, 2017
64ee3eb
snowflake workin
Jan 10, 2017
d2ea805
merged development
Jan 11, 2017
62f2f68
snowflake / circle test harness
Jan 11, 2017
6023586
merged development
Jan 12, 2017
ec50446
two more tests
Jan 12, 2017
a5e71f0
get back into pep8 compliance
Jan 12, 2017
9c95a92
debug setup_db.sh
Jan 12, 2017
3a157fd
display a reasonable status message
Jan 12, 2017
114fb91
add date functions all around
Jan 12, 2017
b959fb8
use absolute import to resolve ns conflict
Jan 13, 2017
f5d7be8
group tests by warehouse type
Jan 13, 2017
2913e81
Merge branch 'development' of github.com:analyst-collective/dbt into …
Jan 14, 2017
736dcf9
remove targets from seeder, and mark it as not implemented for snowflake
Jan 14, 2017
cd1fe4f
pep8
Jan 14, 2017
7c0f26b
rip snowflake support out for windows
Jan 15, 2017
6cf9684
fix appveyor
Jan 15, 2017
887fe85
only do snowflake imports in snowflake adapter module
Jan 15, 2017
2c0e5ec
fix SnowflakeAdapter NameError
Jan 15, 2017
6be3d44
improved error handling
Jan 16, 2017
6e14eb8
pep8
Jan 16, 2017
95e4a75
Merge branch 'development' of github.com:analyst-collective/dbt into …
Jan 16, 2017
4970d6d
update required version of snowflake connector
Jan 16, 2017
15be495
run tests on python 3.6 instead of 3.5
Jan 20, 2017
64e4b67
add python 3.6 to global pyenv
Jan 20, 2017
e30931a
also isntall python 3.6
Jan 20, 2017
527eaa8
install!
Jan 20, 2017
67cc3b4
sudo !!
Jan 20, 2017
73cde44
try python 2.7.12
Jan 20, 2017
2b3e8dd
sneak in a logger name change, and CHANGELOG
Jan 20, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python
FROM python:3.5

RUN apt-get update

Expand All @@ -14,5 +14,3 @@ COPY . /usr/src/app
WORKDIR /usr/src/app
RUN cd /usr/src/app
RUN ./test/setup.sh


2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ test: test-unit test-integration

test-unit:
@echo "Unit test run starting..."
tox -e unit-py27,unit-py35
@docker-compose run test /usr/src/app/test/unit.sh

test-integration:
@echo "Integration test run starting..."
Expand Down
Empty file added dbt/adapters/__init__.py
Empty file.
9 changes: 9 additions & 0 deletions dbt/adapters/factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from dbt.adapters.postgres import PostgresAdapter

def get_adapter(target):
adapters = {
'postgres': PostgresAdapter,
'redshift': PostgresAdapter,
}

return adapters[target.target_type]
150 changes: 150 additions & 0 deletions dbt/adapters/postgres.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import copy
import psycopg2
import re
import time
import yaml

import dbt.flags as flags

from dbt.contracts.connection import validate_connection
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.schema import Schema, READ_PERMISSION_DENIED_ERROR

class PostgresAdapter:

@classmethod
def acquire_connection(cls, profile):

# profile requires some marshalling right now because it includes a
# wee bit of global config.
# TODO remove this
credentials = copy.deepcopy(profile)

credentials.pop('type', None)
credentials.pop('threads', None)

result = {
'type': 'postgres',
'state': 'init',
'handle': None,
'credentials': credentials
}

logger.debug('Acquiring postgres connection')

if flags.STRICT_MODE:
validate_connection(result)

return cls.open_connection(result)

@classmethod
def get_connection(cls, profile):
return cls.acquire_connection(profile)

@staticmethod
def create_table():
pass

@staticmethod
def drop_table():
pass

@classmethod
def execute_model(cls, project, target, model):
schema_helper = Schema(project, target)
parts = re.split(r'-- (DBT_OPERATION .*)', model.compiled_contents)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're probably going to rethink how DBT_OPERATION works in the future. I think it's fine to leave this how it is for now, but let's definitely refrain from copying this logic into future adapters!

profile = project.run_environment()
connection = cls.get_connection(profile)

if flags.STRICT_MODE:
validate_connection(connection)

handle = connection['handle']

status = 'None'
for i, part in enumerate(parts):
matches = re.match(r'^DBT_OPERATION ({.*})$', part)
if matches is not None:
instruction_string = matches.groups()[0]
instruction = yaml.safe_load(instruction_string)
function = instruction['function']
kwargs = instruction['args']

func_map = {
'expand_column_types_if_needed': \
lambda kwargs: schema_helper.expand_column_types_if_needed(
**kwargs)
}

func_map[function](kwargs)
else:
try:
handle, status = cls.add_query_to_transaction(
part, handle)
except psycopg2.ProgrammingError as e:
if "permission denied for" in e.diag.message_primary:
raise RuntimeError(READ_PERMISSION_DENIED_ERROR.format(
model=model.name,
error=str(e).strip(),
user=target.user,
))
else:
raise

handle.commit()
return status

@classmethod
def open_connection(cls, connection):
if connection.get('state') == 'open':
logger.debug('Connection is already open, skipping open.')
return connection

result = connection.copy()

try:
handle = psycopg2.connect(cls.get_connection_spec(connection))

result['handle'] = handle
result['state'] = 'open'
except psycopg2.Error as e:
logger.debug("Got an error when attempting to open a postgres "
"connection: '{}'"
.format(e))

result['handle'] = None
result['state'] = 'fail'

return result

@staticmethod
def get_connection_spec(connection):
credentials = connection.get('credentials')

return ("dbname='{}' user='{}' host='{}' password='{}' port='{}' "
"connect_timeout=10".format(
credentials.get('dbname'),
credentials.get('user'),
credentials.get('host'),
credentials.get('pass'),
credentials.get('port'),
))

@staticmethod
def add_query_to_transaction(sql, handle):
cursor = handle.cursor()

try:
logger.debug("SQL: %s", sql)
pre = time.time()
cursor.execute(sql)
post = time.time()
logger.debug("SQL status: %s in %0.2f seconds", cursor.statusmessage, post-pre)
return handle, cursor.statusmessage
except Exception as e:
handle.rollback()
logger.exception("Error running SQL: %s", sql)
logger.debug("rolling back connection")
raise e
finally:
cursor.close()
Empty file added dbt/adapters/redshift.py
Empty file.
Empty file added dbt/contracts/__init__.py
Empty file.
37 changes: 37 additions & 0 deletions dbt/contracts/connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from voluptuous import Schema, Required, All, Any, Extra, Range
from voluptuous.error import MultipleInvalid

from dbt.exceptions import ValidationException
from dbt.logger import GLOBAL_LOGGER as logger


connection_contract = Schema({
Required('type'): Any('postgres', 'redshift'),
Required('state'): Any('init', 'open', 'closed', 'fail'),
Required('handle'): Any(None, object),
Required('credentials'): object,
})

postgres_credentials_contract = Schema({
Required('dbname'): str,
Required('host'): str,
Required('user'): str,
Required('pass'): str,
Required('port'): All(int, Range(min=0, max=65535)),
Required('schema'): str,
})

credentials_mapping = {
'postgres': postgres_credentials_contract,
'redshift': postgres_credentials_contract,
}

def validate_connection(connection):
try:
connection_contract(connection)

credentials_contract = credentials_mapping.get(connection.get('type'))
credentials_contract(connection.get('credentials'))
except MultipleInvalid as e:
logger.info(e)
raise ValidationException(str(e))
2 changes: 2 additions & 0 deletions dbt/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class ValidationException(Exception):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

pass
1 change: 1 addition & 0 deletions dbt/flags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
STRICT_MODE = False
4 changes: 4 additions & 0 deletions dbt/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import re

import dbt.version
import dbt.flags as flags
import dbt.project as project
import dbt.task.run as run_task
import dbt.task.compile as compile_task
Expand Down Expand Up @@ -37,6 +38,8 @@ def handle(args):

initialize_logger(parsed.debug)

flags.STRICT_MODE = parsed.strict

# this needs to happen after args are parsed so we can determine the correct profiles.yml file
if not config.send_anonymous_usage_stats(parsed.profiles_dir):
dbt.tracking.do_not_track()
Expand Down Expand Up @@ -131,6 +134,7 @@ def parse_args(args):
p = argparse.ArgumentParser(prog='dbt: data build tool', formatter_class=argparse.RawTextHelpFormatter)
p.add_argument('--version', action='version', version=dbt.version.get_version_information(), help="Show version information")
p.add_argument('-d', '--debug', action='store_true', help='Display debug logging during dbt execution. Useful for debugging and making bug reports.')
p.add_argument('-S', '--strict', action='store_true', help='Run schema validations at runtime. This will surface bugs in dbt, but may incur a speed penalty.')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


subs = p.add_subparsers()

Expand Down
35 changes: 3 additions & 32 deletions dbt/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import yaml
from datetime import datetime

from dbt.adapters.factory import get_adapter
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.compilation import compile_string
from dbt.linker import Linker
Expand Down Expand Up @@ -94,38 +95,8 @@ def execute_list(self, queries, source):
return status

def execute_contents(self, target, model):
parts = re.split(r'-- (DBT_OPERATION .*)', model.compiled_contents)
handle = None

status = 'None'
for i, part in enumerate(parts):
matches = re.match(r'^DBT_OPERATION ({.*})$', part)
if matches is not None:
instruction_string = matches.groups()[0]
instruction = yaml.safe_load(instruction_string)
function = instruction['function']
kwargs = instruction['args']

func_map = {
'expand_column_types_if_needed': lambda kwargs: self.schema_helper.expand_column_types_if_needed(**kwargs),
}

func_map[function](kwargs)
else:
try:
handle, status = self.schema_helper.execute_without_auto_commit(part, handle)
except psycopg2.ProgrammingError as e:
if "permission denied for" in e.diag.message_primary:
raise RuntimeError(dbt.schema.READ_PERMISSION_DENIED_ERROR.format(
model=model.name,
error=str(e).strip(),
user=target.user,
))
else:
raise

handle.commit()
return status
return get_adapter(target).execute_model(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is so good

self.project, target, model)

class ModelRunner(BaseRunner):
run_type = 'run'
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ paramiko==2.0.1
sshtunnel==0.0.8.2
snowplow-tracker==0.7.2
celery==3.1.23
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

celery is a dependency of snowplow-tracker -- do we still need to include it? unsure how it got in the requirements file

voluptuous==0.9.3
3 changes: 1 addition & 2 deletions test/integration.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#!/bin/bash

. /usr/src/app/test/setup.sh
workon dbt
pip install tox

cd /usr/src/app
tox -e integration-py27,integration-py35
2 changes: 2 additions & 0 deletions test/integration/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ def run_dbt(self, args=None):
if args is None:
args = ["run"]

args = ["--strict"] + args

return dbt.handle(args)

def run_sql_file(self, path):
Expand Down
2 changes: 1 addition & 1 deletion test/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ mkvirtualenv dbt

cd /usr/src/app

pip install -r requirements.txt
pip install -r requirements.txt
pip install -r dev_requirements.txt
6 changes: 6 additions & 0 deletions test/unit.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/bin/bash

pip install tox

cd /usr/src/app
tox -e unit-py27,unit-py35
38 changes: 38 additions & 0 deletions test/unit/test_postgres_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import unittest

import dbt.flags as flags

from dbt.adapters.postgres import PostgresAdapter
from dbt.exceptions import ValidationException
from dbt.logger import GLOBAL_LOGGER as logger


class TestPostgresAdapter(unittest.TestCase):

def setUp(self):
flags.STRICT_MODE = True

self.profile = {
'dbname': 'postgres',
'user': 'root',
'host': 'database',
'pass': 'password',
'port': 5432,
'schema': 'public'
}

def test_acquire_connection_validations(self):
try:
connection = PostgresAdapter.acquire_connection(self.profile)
self.assertEquals(connection.get('type'), 'postgres')
except ValidationException as e:
self.fail('got ValidationException: {}'.format(str(e)))
except BaseException as e:
self.fail('validation failed with unknown exception: {}'
.format(str(e)))

def test_acquire_connection(self):
connection = PostgresAdapter.acquire_connection(self.profile)

self.assertEquals(connection.get('state'), 'open')
self.assertNotEquals(connection.get('handle'), None)
Loading