From edf3857ca3d523894e55df085a93bca174a173fd Mon Sep 17 00:00:00 2001 From: Jan Omar Date: Fri, 24 Jul 2015 17:56:55 +0200 Subject: [PATCH 1/8] add jdbc settings to extras to allow storage and usage of jdbc connections Conflicts: airflow/www/app.py --- airflow/hooks/jdbc_hook.py | 21 ++++++------ airflow/operators/jdbc_operator.py | 6 ++-- airflow/www/app.py | 25 ++++++++++++-- airflow/www/static/jdbc.js | 34 +++++++++++++++++++ .../www/templates/airflow/conn_create.html | 6 ++++ airflow/www/templates/airflow/conn_edit.html | 6 ++++ 6 files changed, 82 insertions(+), 16 deletions(-) create mode 100644 airflow/www/static/jdbc.js create mode 100644 airflow/www/templates/airflow/conn_create.html create mode 100644 airflow/www/templates/airflow/conn_edit.html diff --git a/airflow/hooks/jdbc_hook.py b/airflow/hooks/jdbc_hook.py index 6e0ed5828a155..858555c4fe3f4 100644 --- a/airflow/hooks/jdbc_hook.py +++ b/airflow/hooks/jdbc_hook.py @@ -26,34 +26,35 @@ class JdbcHook(BaseHook): a '.sql' extensions. """ def __init__( - self, jdbc_url, jdbc_driver_name, jdbc_driver_loc,host=None, login=None, + self, jdbc_driver_name = None, jdbc_driver_loc = None,host=None, login=None, psw=None, db=None, port=None, extra=None, conn_id=None): - self.jdbc_driver_loc = jdbc_driver_loc - self.jdbc_driver_name = jdbc_driver_name if (conn_id is None): self.host = host self.login = login self.psw = psw - self.db = db - self.port = port + #self.db = db + #self.port = port self.extra = extra + self.jdbc_driver_loc = jdbc_driver_loc + self.jdbc_driver_name = jdbc_driver_name else: conn = self.get_connection(conn_id) self.host = conn.host self.login = conn.login self.psw = conn.password - self.db = conn.schema - self.port = conn.port + #self.db = conn.schema + #self.port = conn.port self.extra = conn.extra + self.jdbc_driver_loc = conn.extra_dejson.get('jdbc_drv_path') + self.jdbc_driver_name = conn.extra_dejson.get('jdbc_drv_clsname') - self.jdbc_url = jdbc_url.format(self.host, self.port, self.db, self.extra) + #self.jdbc_url = jdbc_url.format(self.host, self.port, self.db, self.extra) def get_conn(self): - logging.info("Trying to connect using jdbc url: " + self.jdbc_url) conn = jaydebeapi.connect(self.jdbc_driver_name, - [str(self.jdbc_url), str(self.login), str(self.psw)], + [str(self.host), str(self.login), str(self.psw)], self.jdbc_driver_loc,) return conn diff --git a/airflow/operators/jdbc_operator.py b/airflow/operators/jdbc_operator.py index 40a97538201b7..fddad02878998 100644 --- a/airflow/operators/jdbc_operator.py +++ b/airflow/operators/jdbc_operator.py @@ -34,13 +34,11 @@ class JdbcOperator(BaseOperator): @apply_defaults def __init__( - self, sql, - jdbc_url, jdbc_driver_name, jdbc_driver_loc, + self, sql, jdbc_driver_name=None, jdbc_driver_loc=None, conn_id='jdbc_default', autocommit=False, *args, **kwargs): super(JdbcOperator, self).__init__(*args, **kwargs) - self.jdbc_url=jdbc_url self.jdbc_driver_name=jdbc_driver_name self.jdbc_driver_loc=jdbc_driver_loc self.sql = sql @@ -49,6 +47,6 @@ def __init__( def execute(self, context): logging.info('Executing: ' + self.sql) - self.hook = JdbcHook(conn_id=self.conn_id,jdbc_driver_loc=self.jdbc_driver_loc, jdbc_driver_name=self.jdbc_driver_name,jdbc_url=self.jdbc_url) + self.hook = JdbcHook(conn_id=self.conn_id,jdbc_driver_loc=self.jdbc_driver_loc, jdbc_driver_name=self.jdbc_driver_name) for row in self.hook.get_records(self.sql, self.autocommit): logging.info('Result: ' + ','.join(map(str,row)) ) \ No newline at end of file diff --git a/airflow/www/app.py b/airflow/www/app.py index a1157e6d7dc97..a7d6437b4908b 100644 --- a/airflow/www/app.py +++ b/airflow/www/app.py @@ -1578,13 +1578,17 @@ class TaskInstanceModelView(ModelViewOnly): # Hack to not add this view to the menu admin._menu = admin._menu[:-1] - class ConnectionModelView(wwwutils.SuperUserMixin, AirflowModelView): + create_template = 'airflow/conn_create.html' + edit_template = 'airflow/conn_edit.html' verbose_name = "Connection" verbose_name_plural = "Connections" column_default_sort = ('conn_id', False) column_list = ('conn_id', 'conn_type', 'host', 'port') form_overrides = dict(password=VisiblePasswordField) + form_extra_fields = { 'jdbc_drv_path' : StringField('Driver Path'), + 'jdbc_drv_clsname': StringField('Driver Class'), + } form_choices = { 'conn_type': [ ('ftp', 'FTP',), @@ -1602,13 +1606,30 @@ class ConnectionModelView(wwwutils.SuperUserMixin, AirflowModelView): ('samba', 'Samba',), ('sqlite', 'Sqlite',), ] + } + + def on_model_change(self, form, model, is_created): + formdata = form.data + if formdata['conn_type'] == 'jdbc': + jdbc = {key:formdata[key] for key in ('jdbc_drv_path','jdbc_drv_clsname' + #, 'jdbc_conn_url' + ) if key in formdata} + model.extra = json.dumps(jdbc) + + def on_form_prefill(self, form, id): + data = form.data + if 'extra' in data and data['extra'] != None: + d = json.loads(data['extra']) + #form.jdbc_conn_url.data = d['jdbc_conn_url'] + form.jdbc_drv_path.data = d['jdbc_drv_path'] + form.jdbc_drv_clsname.data = d['jdbc_drv_clsname'] + mv = ConnectionModelView( models.Connection, Session, name="Connections", category="Admin") admin.add_view(mv) - class UserModelView(wwwutils.SuperUserMixin, AirflowModelView): verbose_name = "User" verbose_name_plural = "Users" diff --git a/airflow/www/static/jdbc.js b/airflow/www/static/jdbc.js new file mode 100644 index 0000000000000..577f4b4d55936 --- /dev/null +++ b/airflow/www/static/jdbc.js @@ -0,0 +1,34 @@ +/** + * Created by janomar on 23/07/15. + */ + //ugly, refactor soon + function toggleJdbc(connectionType) { + if (connectionType == 'jdbc') { + $("input[name='port']").parent().parent().hide(); + $("input[name='schema']").parent().parent().hide(); + //$("input[name='login']").parent().parent().hide(); + //$("input[name='password']").parent().parent().hide(); + $("input[name='extra']").parent().parent().hide(); + $("input[name='jdbc_drv_clsname']").parent().parent().show(); + $("input[name='jdbc_drv_path']").parent().parent().show(); + $("label[for='host']").text("Connection URL"); + } else { + $("input[name='port']").parent().parent().show(); + $("input[name='schema']").parent().parent().show(); + //$("input[name='login']").parent().parent().show(); + //$("input[name='password']").parent().parent().show(); + $("input[name='extra']").parent().parent().show(); + $("input[name='jdbc_drv_clsname']").parent().parent().hide(); + $("input[name='jdbc_drv_path']").parent().parent().hide(); + $("label[for='host']").text("Host"); + } + } + + jQuery(document).ready(function() { + var conn_type =jQuery("#conn_type").val(); + jQuery("#conn_type").on('change', function(e) { + conn_type = jQuery("#conn_type").val(); + toggleJdbc(conn_type); + }); + toggleJdbc(conn_type); +}); \ No newline at end of file diff --git a/airflow/www/templates/airflow/conn_create.html b/airflow/www/templates/airflow/conn_create.html new file mode 100644 index 0000000000000..e878842bfdd7a --- /dev/null +++ b/airflow/www/templates/airflow/conn_create.html @@ -0,0 +1,6 @@ +{% extends 'airflow/model_create.html' %} + +{% block tail %} + {{ super() }} + +{% endblock %} diff --git a/airflow/www/templates/airflow/conn_edit.html b/airflow/www/templates/airflow/conn_edit.html new file mode 100644 index 0000000000000..2c45f6849aee0 --- /dev/null +++ b/airflow/www/templates/airflow/conn_edit.html @@ -0,0 +1,6 @@ +{% extends 'airflow/model_edit.html' %} + +{% block tail %} + {{ super() }} + +{% endblock %} From 6b7f001a607f2527f88352d7f1aaf608d91f1557 Mon Sep 17 00:00:00 2001 From: Jan Omar Date: Sat, 25 Jul 2015 14:23:15 +0200 Subject: [PATCH 2/8] add StringField to imports --- airflow/www/app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/www/app.py b/airflow/www/app.py index a7d6437b4908b..10aa6b4c23f07 100644 --- a/airflow/www/app.py +++ b/airflow/www/app.py @@ -21,7 +21,7 @@ import sqlalchemy as sqla from wtforms import ( widgets, - Form, DateTimeField, SelectField, TextAreaField, PasswordField) + Form, DateTimeField, SelectField, TextAreaField, PasswordField, StringField) from pygments import highlight, lexers from pygments.formatters import HtmlFormatter From 7ea51e619e365e6193937790c24d150f6f811e17 Mon Sep 17 00:00:00 2001 From: Jan Omar Date: Sun, 26 Jul 2015 11:40:22 +0200 Subject: [PATCH 3/8] use dbapihook as base for jdbchook --- airflow/hooks/jdbc_hook.py | 89 ++++++++++++++---------------- airflow/models.py | 2 +- airflow/operators/jdbc_operator.py | 10 ++-- 3 files changed, 45 insertions(+), 56 deletions(-) diff --git a/airflow/hooks/jdbc_hook.py b/airflow/hooks/jdbc_hook.py index 858555c4fe3f4..d715da651362b 100644 --- a/airflow/hooks/jdbc_hook.py +++ b/airflow/hooks/jdbc_hook.py @@ -3,9 +3,9 @@ import logging import jaydebeapi -from airflow.hooks.base_hook import BaseHook +from airflow.hooks.dbapi_hook import DbApiHook -class JdbcHook(BaseHook): +class JdbcHook(DbApiHook): """ General hook for jdbc db access. @@ -13,6 +13,8 @@ class JdbcHook(BaseHook): Raises an airflow error if the given connection id doesn't exist. Otherwise host, port, schema, username and password can be specified on the fly. + + :param jdbc_url: jdbc connection url :type jdbc_url: string :param jdbc_driver_name: jdbc driver name @@ -25,29 +27,37 @@ class JdbcHook(BaseHook): :type sql: string or string pointing to a template file. File must have a '.sql' extensions. """ + + + conn_name_attr = 'jdbc_conn_id' + default_conn_name = 'jdbc_default' + supports_autocommit = True + def __init__( - self, jdbc_driver_name = None, jdbc_driver_loc = None,host=None, login=None, - psw=None, db=None, port=None, extra=None, conn_id=None): - - if (conn_id is None): - self.host = host - self.login = login - self.psw = psw - #self.db = db - #self.port = port - self.extra = extra - self.jdbc_driver_loc = jdbc_driver_loc - self.jdbc_driver_name = jdbc_driver_name - else: - conn = self.get_connection(conn_id) - self.host = conn.host - self.login = conn.login - self.psw = conn.password - #self.db = conn.schema - #self.port = conn.port - self.extra = conn.extra - self.jdbc_driver_loc = conn.extra_dejson.get('jdbc_drv_path') - self.jdbc_driver_name = conn.extra_dejson.get('jdbc_drv_clsname') + self, *args, **kwargs): + + super(JdbcHook,self).__init__(*args,**kwargs) + + #conn_id = getattr(self, self.conn_name_attr) + #if (conn_id is None): + # self.host = host + # self.login = login + # self.psw = psw + # #self.db = db + # #self.port = port + # self.extra = extra + # self.jdbc_driver_loc = jdbc_driver_loc + # self.jdbc_driver_name = jdbc_driver_name + #else: + conn = self.get_connection(getattr(self, self.conn_name_attr)) + self.host = conn.host + self.login = conn.login + self.psw = conn.password + #self.db = conn.schema + #self.port = conn.port + self.extra = conn.extra + self.jdbc_driver_loc = conn.extra_dejson.get('jdbc_drv_path') + self.jdbc_driver_name = conn.extra_dejson.get('jdbc_drv_clsname') #self.jdbc_url = jdbc_url.format(self.host, self.port, self.db, self.extra) @@ -58,32 +68,13 @@ def get_conn(self): self.jdbc_driver_loc,) return conn - def get_records(self, sql, autocommit=False): - ''' - Executes the sql and returns a set of records. - ''' - conn = self.get_conn() - conn.jconn.autocommit = autocommit - cur = conn.cursor() - cur.execute(sql) - rows = [] if not cur._rs else cur.fetchall() - cur.close() - conn.close() - return rows - - def get_pandas_df(self, sql): - ''' - Executes the sql and returns a pandas dataframe - ''' - import pandas.io.sql as psql - conn = self.get_conn() - df = psql.read_sql(sql, con=conn) - conn.close() - return df - - def run(self, sql, autocommit=False): + def run(self, sql, autocommit=False, parameters=None): + """ + Runs a command + """ conn = self.get_conn() - conn.jconn.autocommit = autocommit + if self.supports_autocommit: + conn.jconn.autocommit = autocommit cur = conn.cursor() cur.execute(sql) conn.commit() diff --git a/airflow/models.py b/airflow/models.py index 4899d7c31cb1b..64a83ab661bf9 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -321,7 +321,7 @@ def get_hook(self): elif self.conn_type == 'sqlite': return hooks.SqliteHook(sqlite_conn_id=self.conn_id) elif self.conn_type == 'jdbc': - return hooks.JdbcHook(conn_id=self.conn_id) + return hooks.JdbcHook(jdbc_conn_id=self.conn_id) except: return None diff --git a/airflow/operators/jdbc_operator.py b/airflow/operators/jdbc_operator.py index fddad02878998..296203554025e 100644 --- a/airflow/operators/jdbc_operator.py +++ b/airflow/operators/jdbc_operator.py @@ -34,19 +34,17 @@ class JdbcOperator(BaseOperator): @apply_defaults def __init__( - self, sql, jdbc_driver_name=None, jdbc_driver_loc=None, - conn_id='jdbc_default', autocommit=False, + self, sql, + jdbc_conn_id='jdbc_default', autocommit=False, *args, **kwargs): super(JdbcOperator, self).__init__(*args, **kwargs) - self.jdbc_driver_name=jdbc_driver_name - self.jdbc_driver_loc=jdbc_driver_loc self.sql = sql - self.conn_id = conn_id + self.jdbc_conn_id = jdbc_conn_id self.autocommit = autocommit def execute(self, context): logging.info('Executing: ' + self.sql) - self.hook = JdbcHook(conn_id=self.conn_id,jdbc_driver_loc=self.jdbc_driver_loc, jdbc_driver_name=self.jdbc_driver_name) + self.hook = JdbcHook(jdbc_conn_id=self.jdbc_conn_id) for row in self.hook.get_records(self.sql, self.autocommit): logging.info('Result: ' + ','.join(map(str,row)) ) \ No newline at end of file From 0726c13a8ab2809160d414e52fd22f67254233af Mon Sep 17 00:00:00 2001 From: Jan Omar Date: Sun, 26 Jul 2015 12:16:02 +0200 Subject: [PATCH 4/8] use css to toggle visibility depending on connection type --- airflow/www/static/jdbc.js | 24 ++++++++---------------- 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/airflow/www/static/jdbc.js b/airflow/www/static/jdbc.js index 577f4b4d55936..f3b42a637c517 100644 --- a/airflow/www/static/jdbc.js +++ b/airflow/www/static/jdbc.js @@ -1,25 +1,17 @@ /** * Created by janomar on 23/07/15. */ - //ugly, refactor soon function toggleJdbc(connectionType) { - if (connectionType == 'jdbc') { - $("input[name='port']").parent().parent().hide(); - $("input[name='schema']").parent().parent().hide(); - //$("input[name='login']").parent().parent().hide(); - //$("input[name='password']").parent().parent().hide(); - $("input[name='extra']").parent().parent().hide(); - $("input[name='jdbc_drv_clsname']").parent().parent().show(); - $("input[name='jdbc_drv_path']").parent().parent().show(); + isJdbc = connectionType == 'jdbc' + $("#port").parent().parent().toggleClass('hide', isJdbc) + $("#schema").parent().parent().toggleClass('hide', isJdbc) + $("#extra").parent().parent().toggleClass('hide', isJdbc) + $("#jdbc_drv_clsname").parent().parent().toggleClass('hide', !isJdbc) + $("#jdbc_drv_path").parent().parent().toggleClass('hide', !isJdbc) + + if (isJdbc) { $("label[for='host']").text("Connection URL"); } else { - $("input[name='port']").parent().parent().show(); - $("input[name='schema']").parent().parent().show(); - //$("input[name='login']").parent().parent().show(); - //$("input[name='password']").parent().parent().show(); - $("input[name='extra']").parent().parent().show(); - $("input[name='jdbc_drv_clsname']").parent().parent().hide(); - $("input[name='jdbc_drv_path']").parent().parent().hide(); $("label[for='host']").text("Host"); } } From a61f28048fb16c2016c1684b9b1f6e30f65c8b9f Mon Sep 17 00:00:00 2001 From: Jan Omar Date: Sun, 26 Jul 2015 13:41:23 +0200 Subject: [PATCH 5/8] make jdbc operator consistent with other db operators --- airflow/operators/jdbc_operator.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airflow/operators/jdbc_operator.py b/airflow/operators/jdbc_operator.py index 296203554025e..11fd4877a056d 100644 --- a/airflow/operators/jdbc_operator.py +++ b/airflow/operators/jdbc_operator.py @@ -46,5 +46,4 @@ def __init__( def execute(self, context): logging.info('Executing: ' + self.sql) self.hook = JdbcHook(jdbc_conn_id=self.jdbc_conn_id) - for row in self.hook.get_records(self.sql, self.autocommit): - logging.info('Result: ' + ','.join(map(str,row)) ) \ No newline at end of file + self.hook.run(self.sql, self.autocommit) \ No newline at end of file From 90c5ee696690c1830b65e0eaecf9629891240b72 Mon Sep 17 00:00:00 2001 From: Jan Omar Date: Tue, 28 Jul 2015 10:22:50 +0200 Subject: [PATCH 6/8] move autocommit to own method for easier overwriting --- airflow/hooks/dbapi_hook.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/airflow/hooks/dbapi_hook.py b/airflow/hooks/dbapi_hook.py index e429a5e80d741..b491123c10956 100644 --- a/airflow/hooks/dbapi_hook.py +++ b/airflow/hooks/dbapi_hook.py @@ -19,7 +19,6 @@ class DbApiHook(BaseHook): # Override with the object that exposes the connect method connector = None # Whether the db supports a special type of autocmmit - # TODO should be changed as a method set_autocommit that can be overriden supports_autocommit = False def __init__(self, *args, **kwargs): @@ -82,13 +81,16 @@ def run(self, sql, autocommit=False, parameters=None): """ conn = self.get_conn() if self.supports_autocommit: - conn.autocommit = autocommit + self.set_autocommit(conn,autocommit) cur = conn.cursor() cur.execute(sql) conn.commit() cur.close() conn.close() + def set_autocommit(self, conn, autocommit): + conn.autocommit = autocommit + def get_cursor(self): """Returns a cursor""" return self.get_conn().cursor() From ebe15bdf1866893c0920404e88824182700ceca0 Mon Sep 17 00:00:00 2001 From: Jan Omar Date: Tue, 28 Jul 2015 10:25:10 +0200 Subject: [PATCH 7/8] remove unnecessary overrides --- airflow/hooks/jdbc_hook.py | 38 ++++++-------------------------------- 1 file changed, 6 insertions(+), 32 deletions(-) diff --git a/airflow/hooks/jdbc_hook.py b/airflow/hooks/jdbc_hook.py index d715da651362b..29cf30d737e1c 100644 --- a/airflow/hooks/jdbc_hook.py +++ b/airflow/hooks/jdbc_hook.py @@ -33,50 +33,24 @@ class JdbcHook(DbApiHook): default_conn_name = 'jdbc_default' supports_autocommit = True - def __init__( - self, *args, **kwargs): - - super(JdbcHook,self).__init__(*args,**kwargs) - - #conn_id = getattr(self, self.conn_name_attr) - #if (conn_id is None): - # self.host = host - # self.login = login - # self.psw = psw - # #self.db = db - # #self.port = port - # self.extra = extra - # self.jdbc_driver_loc = jdbc_driver_loc - # self.jdbc_driver_name = jdbc_driver_name - #else: + def get_conn(self): conn = self.get_connection(getattr(self, self.conn_name_attr)) self.host = conn.host self.login = conn.login self.psw = conn.password - #self.db = conn.schema - #self.port = conn.port self.extra = conn.extra self.jdbc_driver_loc = conn.extra_dejson.get('jdbc_drv_path') self.jdbc_driver_name = conn.extra_dejson.get('jdbc_drv_clsname') - - #self.jdbc_url = jdbc_url.format(self.host, self.port, self.db, self.extra) - - def get_conn(self): conn = jaydebeapi.connect(self.jdbc_driver_name, [str(self.host), str(self.login), str(self.psw)], self.jdbc_driver_loc,) return conn - def run(self, sql, autocommit=False, parameters=None): + def set_autocommit(self, conn, autocommit): """ - Runs a command + Enable or disable autocommit for the given connection + :param conn: The connection + :return: """ - conn = self.get_conn() - if self.supports_autocommit: - conn.jconn.autocommit = autocommit - cur = conn.cursor() - cur.execute(sql) - conn.commit() - cur.close() - conn.close() \ No newline at end of file + conn.jconn.autocommit = autocommit \ No newline at end of file From 5c2cdf4ed1173f7c6691aae1f63f202e3af899b8 Mon Sep 17 00:00:00 2001 From: Jan Omar Date: Tue, 28 Jul 2015 10:39:28 +0200 Subject: [PATCH 8/8] fix instance to local vars --- airflow/hooks/jdbc_hook.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/airflow/hooks/jdbc_hook.py b/airflow/hooks/jdbc_hook.py index 29cf30d737e1c..1556262753962 100644 --- a/airflow/hooks/jdbc_hook.py +++ b/airflow/hooks/jdbc_hook.py @@ -35,16 +35,15 @@ class JdbcHook(DbApiHook): def get_conn(self): conn = self.get_connection(getattr(self, self.conn_name_attr)) - self.host = conn.host - self.login = conn.login - self.psw = conn.password - self.extra = conn.extra - self.jdbc_driver_loc = conn.extra_dejson.get('jdbc_drv_path') - self.jdbc_driver_name = conn.extra_dejson.get('jdbc_drv_clsname') - - conn = jaydebeapi.connect(self.jdbc_driver_name, - [str(self.host), str(self.login), str(self.psw)], - self.jdbc_driver_loc,) + host = conn.host + login = conn.login + psw = conn.password + jdbc_driver_loc = conn.extra_dejson.get('jdbc_drv_path') + jdbc_driver_name = conn.extra_dejson.get('jdbc_drv_clsname') + + conn = jaydebeapi.connect(jdbc_driver_name, + [str(host), str(login), str(psw)], + jdbc_driver_loc,) return conn def set_autocommit(self, conn, autocommit):