Skip to content

Commit

Permalink
per-thread connections
Browse files Browse the repository at this point in the history
parsing now always opens a connection, instead of waiting to need it
  • Loading branch information
Jacob Beck committed Mar 6, 2019
1 parent 2ad1166 commit f165bf8
Show file tree
Hide file tree
Showing 31 changed files with 545 additions and 605 deletions.
260 changes: 108 additions & 152 deletions core/dbt/adapters/base/connections.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import abc
import multiprocessing
import os

import six

import dbt.exceptions
import dbt.flags
from dbt.api import APIObject
from dbt.compat import abstractclassmethod
from dbt.compat import abstractclassmethod, get_ident
from dbt.contracts.connection import Connection
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.utils import translate_aliases
Expand Down Expand Up @@ -71,6 +72,7 @@ class BaseConnectionManager(object):
- open
- begin
- commit
- clear_transaction
- execute
You must also set the 'TYPE' class attribute with a class-unique constant
Expand All @@ -80,83 +82,93 @@ class BaseConnectionManager(object):

def __init__(self, profile):
self.profile = profile
self.in_use = {}
self.available = []
self.thread_connections = {}
self.lock = multiprocessing.RLock()
self._set_initial_connections()

def _set_initial_connections(self):
self.available = []
# set up the array of connections in the 'init' state.
# we add a magic number, 2 because there are overhead connections,
# one for pre- and post-run hooks and other misc operations that occur
# before the run starts, and one for integration tests.
for idx in range(self.profile.threads + 2):
self.available.append(self._empty_connection())

def _empty_connection(self):
return Connection(
type=self.TYPE,
name=None,
state='init',
transaction_open=False,
handle=None,
credentials=self.profile.credentials
)

@staticmethod
def get_thread_identifier():
# note that get_ident() may be re-used, but we should never experience
# that within a single process
return (os.getpid(), get_ident())

def get_thread_connection(self):
key = self.get_thread_identifier()
with self.lock:
if key not in self.thread_connections:
raise RuntimeError(
'connection never acquired for thread {}, have {}'
.format(key, list(self.thread_connections))
)
return self.thread_connections[key]

def get_if_exists(self):
key = self.get_thread_identifier()
with self.lock:
return self.thread_connections.get(key)

def clear_thread_connection(self):
key = self.get_thread_identifier()
with self.lock:
if key in self.thread_connections:
del self.thread_connections[key]

def clear_transaction(self):
"""Clear any existing transactions."""
conn = self.get_thread_connection()
if conn is not None:
self.begin()
self.commit()

@abc.abstractmethod
def exception_handler(self, sql, connection_name='master'):
def exception_handler(self, sql):
"""Create a context manager that handles exceptions caused by database
interactions.
:param str sql: The SQL string that the block inside the context
manager is executing.
:param str connection_name: The name of the connection being used
:return: A context manager that handles exceptions raised by the
underlying database.
"""
raise dbt.exceptions.NotImplementedException(
'`exception_handler` is not implemented for this adapter!')

def get(self, name=None):
"""This is thread-safe as long as two threads don't use the same
"name".
"""
def set_connection_name(self, name=None):
if name is None:
# if a name isn't specified, we'll re-use a single handle
# named 'master'
name = 'master'

with self.lock:
if name in self.in_use:
return self.in_use[name]
conn = self.get_if_exists()
thread_id_key = self.get_thread_identifier()

logger.debug('Acquiring new {} connection "{}".'
.format(self.TYPE, name))

if not self.available:
raise dbt.exceptions.InternalException(
'Tried to request a new connection "{}" but '
'the maximum number of connections are already '
'allocated!'.format(name)
)
if conn is None:
conn = Connection(
type=self.TYPE,
name=None,
state='init',
transaction_open=False,
handle=None,
credentials=self.profile.credentials
)
self.thread_connections[thread_id_key] = conn

connection = self.available.pop()
# connection is temporarily neither in use nor available, but both
# collections are in a sane state, so we can release the lock.
if conn.name == name and conn.state == 'open':
return conn

# this potentially calls open(), but does so without holding the lock
connection = self.assign(connection, name)
logger.debug('Acquiring new {} connection "{}".'
.format(self.TYPE, name))

with self.lock:
if name in self.in_use:
raise dbt.exceptions.InternalException(
'Two threads concurrently tried to get the same name: {}'
.format(name)
)
self.in_use[name] = connection
if conn.state == 'open':
logger.debug(
'Re-using an available connection from the pool (formerly {}).'
.format(conn.name))
else:
logger.debug('Opening a new connection, currently in state {}'
.format(conn.state))
self.open(conn)

return connection
conn.name = name
return conn

@abc.abstractmethod
def cancel_open(self):
Expand All @@ -183,81 +195,39 @@ def open(cls, connection):
'`open` is not implemented for this adapter!'
)

def assign(self, conn, name):
"""Open a connection if it's not already open, and assign it name
regardless.
The caller is responsible for putting the assigned connection into the
in_use collection.
:param Connection conn: A connection, in any state.
:param str name: The name of the connection to set.
"""
if name is None:
name = 'master'

conn.name = name

if conn.state == 'open':
logger.debug('Re-using an available connection from the pool.')
else:
logger.debug('Opening a new connection, currently in state {}'
.format(conn.state))
conn = self.open(conn)

return conn

def _release_connection(self, conn):
if conn.state == 'open':
if conn.transaction_open is True:
self._rollback(conn)
conn.name = None
else:
self.close(conn)

def release(self, name):
def release(self):
with self.lock:
if name not in self.in_use:
conn = self.get_if_exists()
if conn is None:
return

to_release = self.in_use.pop(name)
# to_release is temporarily neither in use nor available, but both
# collections are in a sane state, so we can release the lock.

try:
self._release_connection(to_release)
except:
# if rollback or close failed, replace our busted connection with
# a new one
to_release = self._empty_connection()
if conn.state == 'open':
if conn.transaction_open is True:
self._rollback(conn)
else:
self.close(conn)
except Exception:
# if rollback or close failed, remove our busted connection
self.clear_thread_connection()
raise
finally:
# now that this connection has been rolled back and the name reset,
# or the connection has been closed, put it back on the available
# list
with self.lock:
self.available.append(to_release)

def cleanup_all(self):
with self.lock:
for name, connection in self.in_use.items():
if connection.state != 'closed':
for connection in self.thread_connections.values():
if connection.state not in {'closed', 'init'}:
logger.debug("Connection '{}' was left open."
.format(name))
.format(connection.name))
else:
logger.debug("Connection '{}' was properly closed."
.format(name))

conns_in_use = list(self.in_use.values())
for conn in conns_in_use + self.available:
self.close(conn)
.format(connection.name))
self.close(connection)

# garbage collect these connections
self.in_use.clear()
self._set_initial_connections()
self.thread_connections.clear()

@abc.abstractmethod
def begin(self, name):
def begin(self):
"""Begin a transaction. (passable)
:param str name: The name of the connection to use.
Expand All @@ -266,34 +236,32 @@ def begin(self, name):
'`begin` is not implemented for this adapter!'
)

def get_if_exists(self, name):
if name is None:
name = 'master'

if self.in_use.get(name) is None:
return

return self.get(name)

@abc.abstractmethod
def commit(self, connection):
"""Commit a transaction. (passable)
:param str name: The name of the connection to use.
"""
def commit(self):
"""Commit a transaction. (passable)"""
raise dbt.exceptions.NotImplementedException(
'`commit` is not implemented for this adapter!'
)

def _rollback_handle(self, connection):
@classmethod
def _rollback_handle(cls, connection):
"""Perform the actual rollback operation."""
connection.handle.rollback()

def _rollback(self, connection):
"""Roll back the given connection.
@classmethod
def _close_handle(cls, connection):
"""Perform the actual close operation."""
# On windows, sometimes connection handles don't have a close() attr.
if hasattr(connection.handle, 'close'):
logger.debug('On {}: Close'.format(connection.name))
connection.handle.close()
else:
logger.debug('On {}: No close available on handle'
.format(connection.name))

The connection does not have to be in in_use or available, so this
operation does not require the lock.
@classmethod
def _rollback(cls, connection):
"""Roll back the given connection.
"""
if dbt.flags.STRICT_MODE:
assert isinstance(connection, Connection)
Expand All @@ -304,7 +272,7 @@ def _rollback(self, connection):
'it does not have one open!'.format(connection.name))

logger.debug('On {}: ROLLBACK'.format(connection.name))
self._rollback_handle(connection)
cls._rollback_handle(connection)

connection.transaction_open = False

Expand All @@ -320,40 +288,28 @@ def close(cls, connection):
return connection

if connection.transaction_open and connection.handle:
connection.handle.rollback()
cls._rollback_handle(connection)
connection.transaction_open = False

# On windows, sometimes connection handles don't have a close() attr.
if hasattr(connection.handle, 'close'):
connection.handle.close()
else:
logger.debug('On {}: No close available on handle'
.format(connection.name))

cls._close_handle(connection)
connection.state = 'closed'

return connection

def commit_if_has_connection(self, name):
def commit_if_has_connection(self):
"""If the named connection exists, commit the current transaction.
:param str name: The name of the connection to use.
"""
connection = self.in_use.get(name)
connection = self.get_if_exists()
if connection:
self.commit(connection)

def clear_transaction(self, conn_name='master'):
conn = self.begin(conn_name)
self.commit(conn)
return conn_name
self.commit()

@abc.abstractmethod
def execute(self, sql, name=None, auto_begin=False, fetch=False):
def execute(self, sql, auto_begin=False, fetch=False):
"""Execute the given SQL.
:param str sql: The sql to execute.
:param Optional[str] name: The name to use for the connection.
:param bool auto_begin: If set, and dbt is not currently inside a
transaction, automatically begin one.
:param bool fetch: If set, fetch results.
Expand Down
Loading

0 comments on commit f165bf8

Please sign in to comment.