diff --git a/airflow/hooks/dbapi_hook.py b/airflow/hooks/dbapi_hook.py index e429a5e80d741..b491123c10956 100644 --- a/airflow/hooks/dbapi_hook.py +++ b/airflow/hooks/dbapi_hook.py @@ -19,7 +19,6 @@ class DbApiHook(BaseHook): # Override with the object that exposes the connect method connector = None # Whether the db supports a special type of autocmmit - # TODO should be changed as a method set_autocommit that can be overriden supports_autocommit = False def __init__(self, *args, **kwargs): @@ -82,13 +81,16 @@ def run(self, sql, autocommit=False, parameters=None): """ conn = self.get_conn() if self.supports_autocommit: - conn.autocommit = autocommit + self.set_autocommit(conn,autocommit) cur = conn.cursor() cur.execute(sql) conn.commit() cur.close() conn.close() + def set_autocommit(self, conn, autocommit): + conn.autocommit = autocommit + def get_cursor(self): """Returns a cursor""" return self.get_conn().cursor() diff --git a/airflow/hooks/jdbc_hook.py b/airflow/hooks/jdbc_hook.py index 6e0ed5828a155..1556262753962 100644 --- a/airflow/hooks/jdbc_hook.py +++ b/airflow/hooks/jdbc_hook.py @@ -3,9 +3,9 @@ import logging import jaydebeapi -from airflow.hooks.base_hook import BaseHook +from airflow.hooks.dbapi_hook import DbApiHook -class JdbcHook(BaseHook): +class JdbcHook(DbApiHook): """ General hook for jdbc db access. @@ -13,6 +13,8 @@ class JdbcHook(BaseHook): Raises an airflow error if the given connection id doesn't exist. Otherwise host, port, schema, username and password can be specified on the fly. + + :param jdbc_url: jdbc connection url :type jdbc_url: string :param jdbc_driver_name: jdbc driver name @@ -25,66 +27,29 @@ class JdbcHook(BaseHook): :type sql: string or string pointing to a template file. File must have a '.sql' extensions. """ - def __init__( - self, jdbc_url, jdbc_driver_name, jdbc_driver_loc,host=None, login=None, - psw=None, db=None, port=None, extra=None, conn_id=None): - self.jdbc_driver_loc = jdbc_driver_loc - self.jdbc_driver_name = jdbc_driver_name - - if (conn_id is None): - self.host = host - self.login = login - self.psw = psw - self.db = db - self.port = port - self.extra = extra - else: - conn = self.get_connection(conn_id) - self.host = conn.host - self.login = conn.login - self.psw = conn.password - self.db = conn.schema - self.port = conn.port - self.extra = conn.extra - self.jdbc_url = jdbc_url.format(self.host, self.port, self.db, self.extra) + conn_name_attr = 'jdbc_conn_id' + default_conn_name = 'jdbc_default' + supports_autocommit = True def get_conn(self): - logging.info("Trying to connect using jdbc url: " + self.jdbc_url) - conn = jaydebeapi.connect(self.jdbc_driver_name, - [str(self.jdbc_url), str(self.login), str(self.psw)], - self.jdbc_driver_loc,) + conn = self.get_connection(getattr(self, self.conn_name_attr)) + host = conn.host + login = conn.login + psw = conn.password + jdbc_driver_loc = conn.extra_dejson.get('jdbc_drv_path') + jdbc_driver_name = conn.extra_dejson.get('jdbc_drv_clsname') + + conn = jaydebeapi.connect(jdbc_driver_name, + [str(host), str(login), str(psw)], + jdbc_driver_loc,) return conn - def get_records(self, sql, autocommit=False): - ''' - Executes the sql and returns a set of records. - ''' - conn = self.get_conn() - conn.jconn.autocommit = autocommit - cur = conn.cursor() - cur.execute(sql) - rows = [] if not cur._rs else cur.fetchall() - cur.close() - conn.close() - return rows - - def get_pandas_df(self, sql): - ''' - Executes the sql and returns a pandas dataframe - ''' - import pandas.io.sql as psql - conn = self.get_conn() - df = psql.read_sql(sql, con=conn) - conn.close() - return df - - def run(self, sql, autocommit=False): - conn = self.get_conn() - conn.jconn.autocommit = autocommit - cur = conn.cursor() - cur.execute(sql) - conn.commit() - cur.close() - conn.close() \ No newline at end of file + def set_autocommit(self, conn, autocommit): + """ + Enable or disable autocommit for the given connection + :param conn: The connection + :return: + """ + conn.jconn.autocommit = autocommit \ No newline at end of file diff --git a/airflow/models.py b/airflow/models.py index 4899d7c31cb1b..64a83ab661bf9 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -321,7 +321,7 @@ def get_hook(self): elif self.conn_type == 'sqlite': return hooks.SqliteHook(sqlite_conn_id=self.conn_id) elif self.conn_type == 'jdbc': - return hooks.JdbcHook(conn_id=self.conn_id) + return hooks.JdbcHook(jdbc_conn_id=self.conn_id) except: return None diff --git a/airflow/operators/jdbc_operator.py b/airflow/operators/jdbc_operator.py index 40a97538201b7..11fd4877a056d 100644 --- a/airflow/operators/jdbc_operator.py +++ b/airflow/operators/jdbc_operator.py @@ -35,20 +35,15 @@ class JdbcOperator(BaseOperator): @apply_defaults def __init__( self, sql, - jdbc_url, jdbc_driver_name, jdbc_driver_loc, - conn_id='jdbc_default', autocommit=False, + jdbc_conn_id='jdbc_default', autocommit=False, *args, **kwargs): super(JdbcOperator, self).__init__(*args, **kwargs) - self.jdbc_url=jdbc_url - self.jdbc_driver_name=jdbc_driver_name - self.jdbc_driver_loc=jdbc_driver_loc self.sql = sql - self.conn_id = conn_id + self.jdbc_conn_id = jdbc_conn_id self.autocommit = autocommit def execute(self, context): logging.info('Executing: ' + self.sql) - self.hook = JdbcHook(conn_id=self.conn_id,jdbc_driver_loc=self.jdbc_driver_loc, jdbc_driver_name=self.jdbc_driver_name,jdbc_url=self.jdbc_url) - for row in self.hook.get_records(self.sql, self.autocommit): - logging.info('Result: ' + ','.join(map(str,row)) ) \ No newline at end of file + self.hook = JdbcHook(jdbc_conn_id=self.jdbc_conn_id) + self.hook.run(self.sql, self.autocommit) \ No newline at end of file diff --git a/airflow/www/app.py b/airflow/www/app.py index a1157e6d7dc97..10aa6b4c23f07 100644 --- a/airflow/www/app.py +++ b/airflow/www/app.py @@ -21,7 +21,7 @@ import sqlalchemy as sqla from wtforms import ( widgets, - Form, DateTimeField, SelectField, TextAreaField, PasswordField) + Form, DateTimeField, SelectField, TextAreaField, PasswordField, StringField) from pygments import highlight, lexers from pygments.formatters import HtmlFormatter @@ -1578,13 +1578,17 @@ class TaskInstanceModelView(ModelViewOnly): # Hack to not add this view to the menu admin._menu = admin._menu[:-1] - class ConnectionModelView(wwwutils.SuperUserMixin, AirflowModelView): + create_template = 'airflow/conn_create.html' + edit_template = 'airflow/conn_edit.html' verbose_name = "Connection" verbose_name_plural = "Connections" column_default_sort = ('conn_id', False) column_list = ('conn_id', 'conn_type', 'host', 'port') form_overrides = dict(password=VisiblePasswordField) + form_extra_fields = { 'jdbc_drv_path' : StringField('Driver Path'), + 'jdbc_drv_clsname': StringField('Driver Class'), + } form_choices = { 'conn_type': [ ('ftp', 'FTP',), @@ -1602,13 +1606,30 @@ class ConnectionModelView(wwwutils.SuperUserMixin, AirflowModelView): ('samba', 'Samba',), ('sqlite', 'Sqlite',), ] + } + + def on_model_change(self, form, model, is_created): + formdata = form.data + if formdata['conn_type'] == 'jdbc': + jdbc = {key:formdata[key] for key in ('jdbc_drv_path','jdbc_drv_clsname' + #, 'jdbc_conn_url' + ) if key in formdata} + model.extra = json.dumps(jdbc) + + def on_form_prefill(self, form, id): + data = form.data + if 'extra' in data and data['extra'] != None: + d = json.loads(data['extra']) + #form.jdbc_conn_url.data = d['jdbc_conn_url'] + form.jdbc_drv_path.data = d['jdbc_drv_path'] + form.jdbc_drv_clsname.data = d['jdbc_drv_clsname'] + mv = ConnectionModelView( models.Connection, Session, name="Connections", category="Admin") admin.add_view(mv) - class UserModelView(wwwutils.SuperUserMixin, AirflowModelView): verbose_name = "User" verbose_name_plural = "Users" diff --git a/airflow/www/static/jdbc.js b/airflow/www/static/jdbc.js new file mode 100644 index 0000000000000..f3b42a637c517 --- /dev/null +++ b/airflow/www/static/jdbc.js @@ -0,0 +1,26 @@ +/** + * Created by janomar on 23/07/15. + */ + function toggleJdbc(connectionType) { + isJdbc = connectionType == 'jdbc' + $("#port").parent().parent().toggleClass('hide', isJdbc) + $("#schema").parent().parent().toggleClass('hide', isJdbc) + $("#extra").parent().parent().toggleClass('hide', isJdbc) + $("#jdbc_drv_clsname").parent().parent().toggleClass('hide', !isJdbc) + $("#jdbc_drv_path").parent().parent().toggleClass('hide', !isJdbc) + + if (isJdbc) { + $("label[for='host']").text("Connection URL"); + } else { + $("label[for='host']").text("Host"); + } + } + + jQuery(document).ready(function() { + var conn_type =jQuery("#conn_type").val(); + jQuery("#conn_type").on('change', function(e) { + conn_type = jQuery("#conn_type").val(); + toggleJdbc(conn_type); + }); + toggleJdbc(conn_type); +}); \ No newline at end of file diff --git a/airflow/www/templates/airflow/conn_create.html b/airflow/www/templates/airflow/conn_create.html new file mode 100644 index 0000000000000..e878842bfdd7a --- /dev/null +++ b/airflow/www/templates/airflow/conn_create.html @@ -0,0 +1,6 @@ +{% extends 'airflow/model_create.html' %} + +{% block tail %} + {{ super() }} + +{% endblock %} diff --git a/airflow/www/templates/airflow/conn_edit.html b/airflow/www/templates/airflow/conn_edit.html new file mode 100644 index 0000000000000..2c45f6849aee0 --- /dev/null +++ b/airflow/www/templates/airflow/conn_edit.html @@ -0,0 +1,6 @@ +{% extends 'airflow/model_edit.html' %} + +{% block tail %} + {{ super() }} + +{% endblock %}