Skip to content

Commit

Permalink
Merge pull request #499 from alexanderlz/master
Browse files Browse the repository at this point in the history
Feature: Support Impala as DataSource
  • Loading branch information
arikfr committed Jul 22, 2015
2 parents d4a18ba + e4b9d23 commit 5d3caac
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 2 deletions.
159 changes: 159 additions & 0 deletions redash/query_runner/impala_ds.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import json
import logging
import sys

from redash.query_runner import *
from redash.utils import JSONEncoder

logger = logging.getLogger(__name__)

try:
from impala.dbapi import connect
from impala.error import DatabaseError, RPCError
enabled = True
except ImportError, e:
logger.exception(e)
logger.warning("Missing dependencies. Please install impyla.")
logger.warning("You can use pip: pip install impyla")
enabled = False

COLUMN_NAME = 0
COLUMN_TYPE = 1

types_map = {
'BIGINT': TYPE_INTEGER,
'TINYINT': TYPE_INTEGER,
'SMALLINT': TYPE_INTEGER,
'INT': TYPE_INTEGER,
'DOUBLE': TYPE_FLOAT,
'DECIMAL': TYPE_FLOAT,
'FLOAT': TYPE_FLOAT,
'REAL': TYPE_FLOAT,
'BOOLEAN': TYPE_BOOLEAN,
'TIMESTAMP': TYPE_DATETIME,
'CHAR': TYPE_STRING,
'STRING': TYPE_STRING,
'VARCHAR': TYPE_STRING
}


class Impala(BaseQueryRunner):
@classmethod
def configuration_schema(cls):
return {
"type": "object",
"properties": {
"host": {
"type": "string"
},
"port": {
"type": "number"
},
"protocol": {
"type": "string",
"title": "Please specify beeswax or hiveserver2"
},
"database": {
"type": "string"
},
"use_ldap": {
"type": "boolean"
},
"ldap_user": {
"type": "string"
},
"ldap_password": {
"type": "string"
},
"timeout": {
"type": "number"
}
},
"required": ["host"]
}

@classmethod
def type(cls):
return "impala"

def __init__(self, configuration_json):
super(Impala, self).__init__(configuration_json)

def _run_query_internal(self, query):
results, error = self.run_query(query)

if error is not None:
raise Exception("Failed getting schema.")
return json.loads(results)['rows']

def get_schema(self):
try:
schemas_query = "show schemas;"

tables_query = "show tables in %s;"

columns_query = "show column stats %s;"

schema = {}
for schema_name in map(lambda a: a['name'], self._run_query_internal(schemas_query)):
for table_name in map(lambda a: a['name'], self._run_query_internal(tables_query % schema_name)):
columns = map(lambda a: a['Column'], self._run_query_internal(columns_query % table_name))

if schema_name != 'default':
table_name = '{}.{}'.format(schema_name, table_name)

schema[table_name] = {'name': table_name, 'columns': columns}
except Exception, e:
raise sys.exc_info()[1], None, sys.exc_info()[2]
return schema.values()

def run_query(self, query):

connection = None
try:
connection = connect(**self.configuration)

cursor = connection.cursor()

cursor.execute(query)

column_names = []
columns = []

for column in cursor.description:
column_name = column[COLUMN_NAME]
column_names.append(column_name)

columns.append({
'name': column_name,
'friendly_name': column_name,
'type': types_map.get(column[COLUMN_TYPE], None)
})

rows = [dict(zip(column_names, row)) for row in cursor]

data = {'columns': columns, 'rows': rows}
json_data = json.dumps(data, cls=JSONEncoder)
error = None
cursor.close()
except DatabaseError as e:
logging.exception(e)
json_data = None
error = e.message
except RPCError as e:
logging.exception(e)
json_data = None
error = "Metastore Error [%s]" % e.message
except KeyboardInterrupt:
connection.cancel()
error = "Query cancelled by user."
json_data = None
except Exception as e:
logging.exception(e)
raise sys.exc_info()[1], None, sys.exc_info()[2]
finally:
connection.close()

return json_data, error

register(Impala)
2 changes: 1 addition & 1 deletion redash/query_runner/pg.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def get_schema(self):
results, error = self.run_query(query)

if error is not None:
raise Exception("Failed getting schema.")
raise Exception("Failed getting schema.")

results = json.loads(results)

Expand Down
1 change: 1 addition & 0 deletions redash/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def parse_boolean(str):
'redash.query_runner.url',
'redash.query_runner.influx_db',
'redash.query_runner.presto',
'redash.query_runner.impala_ds',
])))

# Features:
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ click==3.3
RestrictedPython==3.6.0
wtf-peewee==0.2.3
pysaml2==2.4.0
pycrypto==2.6.1
pycrypto==2.6.1

0 comments on commit 5d3caac

Please sign in to comment.