Skip to content

Commit

Permalink
Merge pull request #1207 from underyx/patch-1
Browse files Browse the repository at this point in the history
Allow disabling periodic committing when inserting rows with DbApiHook
  • Loading branch information
r39132 committed Apr 13, 2016
2 parents 9689159 + b940706 commit 5d15d68
Showing 1 changed file with 40 additions and 2 deletions.
42 changes: 40 additions & 2 deletions airflow/hooks/dbapi_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ def get_conn(self):
def get_pandas_df(self, sql, parameters=None):
'''
Executes the sql and returns a pandas dataframe
:param sql: the sql statement to be executed (str) or a list of
sql statements to execute
:type sql: str or list
:param parameters: The parameters to render the SQL query with.
:type parameters: mapping or iterable
'''
import pandas.io.sql as psql
conn = self.get_conn()
Expand All @@ -56,6 +62,12 @@ def get_pandas_df(self, sql, parameters=None):
def get_records(self, sql, parameters=None):
'''
Executes the sql and returns a set of records.
:param sql: the sql statement to be executed (str) or a list of
sql statements to execute
:type sql: str or list
:param parameters: The parameters to render the SQL query with.
:type parameters: mapping or iterable
'''
conn = self.get_conn()
cur = self.get_cursor()
Expand All @@ -70,7 +82,13 @@ def get_records(self, sql, parameters=None):

def get_first(self, sql, parameters=None):
'''
Executes the sql and returns a set of records.
Executes the sql and returns the first resulting row.
:param sql: the sql statement to be executed (str) or a list of
sql statements to execute
:type sql: str or list
:param parameters: The parameters to render the SQL query with.
:type parameters: mapping or iterable
'''
conn = self.get_conn()
cur = conn.cursor()
Expand All @@ -92,6 +110,11 @@ def run(self, sql, autocommit=False, parameters=None):
:param sql: the sql statement to be executed (str) or a list of
sql statements to execute
:type sql: str or list
:param autocommit: What to set the connection's autocommit setting to
before executing the query.
:type autocommit: bool
:param parameters: The parameters to render the SQL query with.
:type parameters: mapping or iterable
"""
conn = self.get_conn()
if isinstance(sql, basestring):
Expand Down Expand Up @@ -124,6 +147,16 @@ def insert_rows(self, table, rows, target_fields=None, commit_every=1000):
"""
A generic way to insert a set of tuples into a table,
the whole set of inserts is treated as one transaction
:param table: Name of the target table
:type table: str
:param rows: The rows to insert into the table
:type rows: iterable of tuples
:param target_fields: The names of the columns to fill in the table
:type target_fields: iterable of strings
:param commit_every: The maximum number of rows to insert in one
transaction. Set to 0 to insert all rows in one transaction.
:type commit_every: int
"""
if target_fields:
target_fields = ", ".join(target_fields)
Expand All @@ -147,7 +180,7 @@ def insert_rows(self, table, rows, target_fields=None, commit_every=1000):
target_fields,
",".join(values))
cur.execute(sql)
if i % commit_every == 0:
if commit_every and i % commit_every == 0:
conn.commit()
logging.info(
"Loaded {i} into {table} rows so far".format(**locals()))
Expand All @@ -173,5 +206,10 @@ def _serialize_cell(cell):
def bulk_load(self, table, tmp_file):
"""
Loads a tab-delimited file into a database table
:param table: The name of the target table
:type table: str
:param tmp_file: The path of the file to load into the table
:type tmp_file: str
"""
raise NotImplementedError()

0 comments on commit 5d15d68

Please sign in to comment.