Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions airflow/hooks/dbapi_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ class DbApiHook(BaseHook):
# Override with the object that exposes the connect method
connector = None
# Whether the db supports a special type of autocmmit
# TODO should be changed as a method set_autocommit that can be overriden
supports_autocommit = False

def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -82,13 +81,16 @@ def run(self, sql, autocommit=False, parameters=None):
"""
conn = self.get_conn()
if self.supports_autocommit:
conn.autocommit = autocommit
self.set_autocommit(conn,autocommit)
cur = conn.cursor()
cur.execute(sql)
conn.commit()
cur.close()
conn.close()

def set_autocommit(self, conn, autocommit):
conn.autocommit = autocommit

def get_cursor(self):
"""Returns a cursor"""
return self.get_conn().cursor()
Expand Down
83 changes: 24 additions & 59 deletions airflow/hooks/jdbc_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@
import logging
import jaydebeapi

from airflow.hooks.base_hook import BaseHook
from airflow.hooks.dbapi_hook import DbApiHook

class JdbcHook(BaseHook):
class JdbcHook(DbApiHook):
"""
General hook for jdbc db access.

If a connection id is specified, host, port, schema, username and password will be taken from the predefined connection.
Raises an airflow error if the given connection id doesn't exist.
Otherwise host, port, schema, username and password can be specified on the fly.



:param jdbc_url: jdbc connection url
:type jdbc_url: string
:param jdbc_driver_name: jdbc driver name
Expand All @@ -25,66 +27,29 @@ class JdbcHook(BaseHook):
:type sql: string or string pointing to a template file. File must have
a '.sql' extensions.
"""
def __init__(
self, jdbc_url, jdbc_driver_name, jdbc_driver_loc,host=None, login=None,
psw=None, db=None, port=None, extra=None, conn_id=None):
self.jdbc_driver_loc = jdbc_driver_loc
self.jdbc_driver_name = jdbc_driver_name

if (conn_id is None):
self.host = host
self.login = login
self.psw = psw
self.db = db
self.port = port
self.extra = extra
else:
conn = self.get_connection(conn_id)
self.host = conn.host
self.login = conn.login
self.psw = conn.password
self.db = conn.schema
self.port = conn.port
self.extra = conn.extra


self.jdbc_url = jdbc_url.format(self.host, self.port, self.db, self.extra)
conn_name_attr = 'jdbc_conn_id'
default_conn_name = 'jdbc_default'
supports_autocommit = True

def get_conn(self):
logging.info("Trying to connect using jdbc url: " + self.jdbc_url)
conn = jaydebeapi.connect(self.jdbc_driver_name,
[str(self.jdbc_url), str(self.login), str(self.psw)],
self.jdbc_driver_loc,)
conn = self.get_connection(getattr(self, self.conn_name_attr))
host = conn.host
login = conn.login
psw = conn.password
jdbc_driver_loc = conn.extra_dejson.get('jdbc_drv_path')
jdbc_driver_name = conn.extra_dejson.get('jdbc_drv_clsname')

conn = jaydebeapi.connect(jdbc_driver_name,
[str(host), str(login), str(psw)],
jdbc_driver_loc,)
return conn

def get_records(self, sql, autocommit=False):
'''
Executes the sql and returns a set of records.
'''
conn = self.get_conn()
conn.jconn.autocommit = autocommit
cur = conn.cursor()
cur.execute(sql)
rows = [] if not cur._rs else cur.fetchall()
cur.close()
conn.close()
return rows

def get_pandas_df(self, sql):
'''
Executes the sql and returns a pandas dataframe
'''
import pandas.io.sql as psql
conn = self.get_conn()
df = psql.read_sql(sql, con=conn)
conn.close()
return df

def run(self, sql, autocommit=False):
conn = self.get_conn()
conn.jconn.autocommit = autocommit
cur = conn.cursor()
cur.execute(sql)
conn.commit()
cur.close()
conn.close()
def set_autocommit(self, conn, autocommit):
"""
Enable or disable autocommit for the given connection
:param conn: The connection
:return:
"""
conn.jconn.autocommit = autocommit
2 changes: 1 addition & 1 deletion airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ def get_hook(self):
elif self.conn_type == 'sqlite':
return hooks.SqliteHook(sqlite_conn_id=self.conn_id)
elif self.conn_type == 'jdbc':
return hooks.JdbcHook(conn_id=self.conn_id)
return hooks.JdbcHook(jdbc_conn_id=self.conn_id)
except:
return None

Expand Down
13 changes: 4 additions & 9 deletions airflow/operators/jdbc_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,15 @@ class JdbcOperator(BaseOperator):
@apply_defaults
def __init__(
self, sql,
jdbc_url, jdbc_driver_name, jdbc_driver_loc,
conn_id='jdbc_default', autocommit=False,
jdbc_conn_id='jdbc_default', autocommit=False,
*args, **kwargs):
super(JdbcOperator, self).__init__(*args, **kwargs)

self.jdbc_url=jdbc_url
self.jdbc_driver_name=jdbc_driver_name
self.jdbc_driver_loc=jdbc_driver_loc
self.sql = sql
self.conn_id = conn_id
self.jdbc_conn_id = jdbc_conn_id
self.autocommit = autocommit

def execute(self, context):
logging.info('Executing: ' + self.sql)
self.hook = JdbcHook(conn_id=self.conn_id,jdbc_driver_loc=self.jdbc_driver_loc, jdbc_driver_name=self.jdbc_driver_name,jdbc_url=self.jdbc_url)
for row in self.hook.get_records(self.sql, self.autocommit):
logging.info('Result: ' + ','.join(map(str,row)) )
self.hook = JdbcHook(jdbc_conn_id=self.jdbc_conn_id)
self.hook.run(self.sql, self.autocommit)
27 changes: 24 additions & 3 deletions airflow/www/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import sqlalchemy as sqla
from wtforms import (
widgets,
Form, DateTimeField, SelectField, TextAreaField, PasswordField)
Form, DateTimeField, SelectField, TextAreaField, PasswordField, StringField)

from pygments import highlight, lexers
from pygments.formatters import HtmlFormatter
Expand Down Expand Up @@ -1578,13 +1578,17 @@ class TaskInstanceModelView(ModelViewOnly):
# Hack to not add this view to the menu
admin._menu = admin._menu[:-1]


class ConnectionModelView(wwwutils.SuperUserMixin, AirflowModelView):
create_template = 'airflow/conn_create.html'
edit_template = 'airflow/conn_edit.html'
verbose_name = "Connection"
verbose_name_plural = "Connections"
column_default_sort = ('conn_id', False)
column_list = ('conn_id', 'conn_type', 'host', 'port')
form_overrides = dict(password=VisiblePasswordField)
form_extra_fields = { 'jdbc_drv_path' : StringField('Driver Path'),
'jdbc_drv_clsname': StringField('Driver Class'),
}
form_choices = {
'conn_type': [
('ftp', 'FTP',),
Expand All @@ -1602,13 +1606,30 @@ class ConnectionModelView(wwwutils.SuperUserMixin, AirflowModelView):
('samba', 'Samba',),
('sqlite', 'Sqlite',),
]

}

def on_model_change(self, form, model, is_created):
formdata = form.data
if formdata['conn_type'] == 'jdbc':
jdbc = {key:formdata[key] for key in ('jdbc_drv_path','jdbc_drv_clsname'
#, 'jdbc_conn_url'
) if key in formdata}
model.extra = json.dumps(jdbc)

def on_form_prefill(self, form, id):
data = form.data
if 'extra' in data and data['extra'] != None:
d = json.loads(data['extra'])
#form.jdbc_conn_url.data = d['jdbc_conn_url']
form.jdbc_drv_path.data = d['jdbc_drv_path']
form.jdbc_drv_clsname.data = d['jdbc_drv_clsname']

mv = ConnectionModelView(
models.Connection, Session,
name="Connections", category="Admin")
admin.add_view(mv)


class UserModelView(wwwutils.SuperUserMixin, AirflowModelView):
verbose_name = "User"
verbose_name_plural = "Users"
Expand Down
26 changes: 26 additions & 0 deletions airflow/www/static/jdbc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* Created by janomar on 23/07/15.
*/
function toggleJdbc(connectionType) {
isJdbc = connectionType == 'jdbc'
$("#port").parent().parent().toggleClass('hide', isJdbc)
$("#schema").parent().parent().toggleClass('hide', isJdbc)
$("#extra").parent().parent().toggleClass('hide', isJdbc)
$("#jdbc_drv_clsname").parent().parent().toggleClass('hide', !isJdbc)
$("#jdbc_drv_path").parent().parent().toggleClass('hide', !isJdbc)

if (isJdbc) {
$("label[for='host']").text("Connection URL");
} else {
$("label[for='host']").text("Host");
}
}

jQuery(document).ready(function() {
var conn_type =jQuery("#conn_type").val();
jQuery("#conn_type").on('change', function(e) {
conn_type = jQuery("#conn_type").val();
toggleJdbc(conn_type);
});
toggleJdbc(conn_type);
});
6 changes: 6 additions & 0 deletions airflow/www/templates/airflow/conn_create.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{% extends 'airflow/model_create.html' %}

{% block tail %}
{{ super() }}
<script src="{{ url_for('static', filename='jdbc.js') }}"></script>
{% endblock %}
6 changes: 6 additions & 0 deletions airflow/www/templates/airflow/conn_edit.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{% extends 'airflow/model_edit.html' %}

{% block tail %}
{{ super() }}
<script src="{{ url_for('static', filename='jdbc.js') }}"></script>
{% endblock %}