Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
bci-flolas committed Jul 14, 2021
1 parent 272b5aa commit bc06689
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 9 deletions.
9 changes: 4 additions & 5 deletions airflow/providers/teradata/hooks/ttu.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ def get_conn(self) -> dict:
password=connection.password,
host=connection.host,
ttu_log_folder=extras.get('ttu_log_folder', conf.get('logging', 'BASE_LOG_FOLDER')),
ttu_max_sessions=extras.get('ttu_max_sessions', 1),
console_output_encoding=extras.get('console_output_encoding', 'utf-8'),
bteq_session_encoding=extras.get('bteq_session_encoding', 'ASCII'),
bteq_output_width=extras.get('bteq_output_width', 65531),
Expand Down Expand Up @@ -125,7 +124,7 @@ def execute_bteq(self, bteq, xcom_push_flag=False):
if xcom_push_flag:
return line

def execute_tdload(self, input_file, table, delimiter=';', working_database=None, encoding='UTF8', xcom_push_flag=False, raise_on_rows_error=False, raise_on_rows_duplicated=False, debug=False, restart_limit=0):
def execute_tdload(self, input_file, table, delimiter=';', working_database=None, encoding='UTF8', xcom_push_flag=False, raise_on_rows_error=False, raise_on_rows_duplicated=False, debug=False, max_sessions=1, restart_limit=0):
"""
Load a CSV file to Teradata Table (previously created) using tdload binary.
Note: You need to strip header of the CSV. tdload only accepts rows, not header.
Expand Down Expand Up @@ -160,7 +159,7 @@ def execute_tdload(self, input_file, table, delimiter=';', working_database=None
delimiter,
fload_out_path,
fload_checkpoint_path,
conn['ttu_max_sessions'],
max_sessions,
working_database,
debug,
restart_limit
Expand Down Expand Up @@ -203,7 +202,7 @@ def execute_tdload(self, input_file, table, delimiter=';', working_database=None
return line


def execute_tptexport(self, sql, output_file, delimiter = ';', encoding='UTF8', spool_mode='SPOOL', xcom_push_flag=False, double_quote_varchar=True):
def execute_tptexport(self, sql, output_file, delimiter = ';', encoding='UTF8', spool_mode='SPOOL', xcom_push_flag=False, double_quote_varchar=True, max_sessions=1):
"""
Export a table from Teradata Table using tpt binary.
Note: The exported CSV file does not contains header row
Expand Down Expand Up @@ -241,7 +240,7 @@ def execute_tptexport(self, sql, output_file, delimiter = ';', encoding='UTF8',
conn['host'],
conn['login'],
conn['password'],
conn['ttu_max_sessions'],
max_sessions,
), 'utf_8'))
f.flush()
fname = f.name
Expand Down
6 changes: 5 additions & 1 deletion airflow/providers/teradata/operators/fastexport.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def __init__(
spool_mode: str = 'SPOOL',
xcom_push: bool = True,
ttu_conn_id: str = 'ttu_default',
max_sessions: Optional[int] = 1,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -54,6 +55,7 @@ def __init__(
self.xcom_push = xcom_push
self._hook = None
self.ttu_conn_id = ttu_conn_id
self.max_sessions = max_sessions

def execute(self, context):
"""
Expand All @@ -65,6 +67,8 @@ def execute(self, context):
delimiter=self.delimiter,
encoding=self.encoding,
spool_mode=self.spool_mode,
xcom_push_flag=self.xcom_push)
xcom_push_flag=self.xcom_push,
max_sessions=self.max_sessions
)
def on_kill(self):
self._hook.on_kill()
7 changes: 6 additions & 1 deletion airflow/providers/teradata/operators/fastload.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ def __init__(
raise_on_rows_duplicated: bool = True,
xcom_push: bool = True,
ttu_conn_id: str = 'ttu_default',
max_sessions: Optional[int] = 1,
debug: Optional[bool] = False,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -72,6 +74,8 @@ def __init__(
self.preoperator_bteq = preoperator_bteq
self.raise_on_rows_error = raise_on_rows_error
self.raise_on_rows_duplicated = raise_on_rows_duplicated
self.max_sessions = max_sessions
self.debug = debug

def execute(self, context):
"""
Expand All @@ -94,7 +98,8 @@ def execute(self, context):
xcom_push_flag=self.xcom_push,
raise_on_rows_error=self.raise_on_rows_error,
raise_on_rows_duplicated=self.raise_on_rows_duplicated,
debug=self.debug
debug=self.debug,
max_sessions=self.max_sessions
)
def on_kill(self):
self._hook.on_kill()
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 1.0.7
current_version = 1.0.8
commit = True
tag = True

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@
packages=find_namespace_packages(include=['airflow.providers.teradata', 'airflow.providers.teradata.*']),
setup_requires=['setuptools', 'wheel'],
url='https://github.com/flolas/apache_airflow_providers_teradata',
version='1.0.7',
version='1.0.8',
zip_safe=False,
)

0 comments on commit bc06689

Please sign in to comment.