Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

athena&presto support column type and partition key #3020

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions client/app/components/QueryEditor.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,18 @@ function buildKeywordsFromSchema(schema) {
schema.forEach((table) => {
keywords[table.name] = 'Table';
table.columns.forEach((c) => {
keywords[c] = 'Column';
keywords[`${table.name}.${c}`] = 'Column';
if (typeof c === 'string') {
keywords[c] = 'Column';
keywords[`${table.name}.${c}`] = 'Column';
} else if (typeof c === 'object') {
c.forEach((a, b) => {
if (a === 'column_name') {
const columnName = b;
keywords[columnName] = 'Column';
keywords[`${table.name}.${columnName}`] = 'Column';
}
});
}
});
});

Expand Down
4 changes: 2 additions & 2 deletions client/app/components/queries/schema-browser.html
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
ng-click="$ctrl.itemSelected($event, [table.name])"></i>
</div>
<div uib-collapse="table.collapsed">
<div ng-repeat="column in table.columns | filter:$ctrl.schemaFilterColumn track by column" class="table-open">{{column}}
<div ng-repeat="column in table.columns[0] | filter:$ctrl.schemaFilterColumn track by column.column_name" class="table-open"><span ng-if="column.extra_info == 'partition key'"><i class="fa fa-key" aria-hidden="true"></i></span> {{column.column_name}} <span ng-if="column.column_type"> ({{column.column_type}})</span>
<i class="fa fa-angle-double-right copy-to-editor" aria-hidden="true"
ng-click="$ctrl.itemSelected($event, [column])"></i>
ng-click="$ctrl.itemSelected($event, [column.column_name])"></i>
</div>
</div>
</div>
Expand Down
30 changes: 18 additions & 12 deletions redash/query_runner/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from redash.query_runner import *
from redash.settings import parse_boolean
from redash.utils import json_dumps, json_loads
from .presto import format_schema

logger = logging.getLogger(__name__)
ANNOTATE_QUERY = parse_boolean(os.environ.get('ATHENA_ANNOTATE_QUERY', 'true'))
Expand Down Expand Up @@ -127,21 +128,31 @@ def __get_schema_from_glue(self):
iterator = paginator.paginate(DatabaseName=database['Name'])
for table in iterator.search('TableList[]'):
table_name = '%s.%s' % (database['Name'], table['Name'])
columns = []
if table_name not in schema:
column = [columns['Name'] for columns in table['StorageDescriptor']['Columns']]
schema[table_name] = {'name': table_name, 'columns': column}
for partition in table.get('PartitionKeys', []):
schema[table_name]['columns'].append(partition['Name'])
schema[table_name] = {'name': table_name, 'columns': []}

for partition in table.get('PartitionKeys', []):
columns.append({
'column_name': partition['Name'],
'extra_info': 'partition key'
})
for column in table['StorageDescriptor']['Columns']:
columns.append({
'column_name': column['Name'],
'column_type': column['Type'],
})
schema[table_name]['columns'].append(columns)
print schema.values()

return schema.values()

def get_schema(self, get_stats=False):
if self.configuration.get('glue', False):
return self.__get_schema_from_glue()

schema = {}
query = """
SELECT table_schema, table_name, column_name
SELECT table_schema, table_name, column_name, data_type as column_type, extra_info
FROM information_schema.columns
WHERE table_schema NOT IN ('information_schema')
"""
Expand All @@ -150,12 +161,7 @@ def get_schema(self, get_stats=False):
if error is not None:
raise Exception("Failed getting schema.")

results = json_loads(results)
for row in results['rows']:
table_name = '{0}.{1}'.format(row['table_schema'], row['table_name'])
if table_name not in schema:
schema[table_name] = {'name': table_name, 'columns': []}
schema[table_name]['columns'].append(row['column_name'])
schema = format_schema(json.loads(results))

return schema.values()

Expand Down
32 changes: 22 additions & 10 deletions redash/query_runner/presto.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,26 @@
}


def format_schema(results):
"""
This function formats the schema, table, and columns of Athena and Presto
for display in the UI schema browser.
"""
schema = {}
for row in results['rows']:
column_list = []
table_name = '{}.{}'.format(row['table_schema'], row['table_name'])
if table_name not in schema:
schema[table_name] = {'name': table_name, columns: []}
column_list.append({
'column_name': row['column_name'],
'extra_info': row['extra_info'],
'column_type': row['column_type']
})
schema[table_name]['columns'] = columns_list[0]
return schema


class Presto(BaseQueryRunner):
noop_query = 'SHOW TABLES'

Expand Down Expand Up @@ -72,7 +92,7 @@ def type(cls):
def get_schema(self, get_stats=False):
schema = {}
query = """
SELECT table_schema, table_name, column_name
SELECT table_schema, table_name, column_name, data_type as column_type, extra_info
FROM information_schema.columns
WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
"""
Expand All @@ -82,15 +102,7 @@ def get_schema(self, get_stats=False):
if error is not None:
raise Exception("Failed getting schema.")

results = json_loads(results)

for row in results['rows']:
table_name = '{}.{}'.format(row['table_schema'], row['table_name'])

if table_name not in schema:
schema[table_name] = {'name': table_name, 'columns': []}

schema[table_name]['columns'].append(row['column_name'])
schema = format_schema(json.loads(results))

return schema.values()

Expand Down
8 changes: 4 additions & 4 deletions tests/query_runner/test_athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def test_external_table(self):
{'DatabaseName': 'test1'},
)
with self.stubber:
assert query_runner.get_schema() == [{'columns': ['row_id'], 'name': 'test1.jdbc_table'}]
assert query_runner.get_schema() == [{'columns': [[{'column_name':'row_id','column_type':'int'}]], 'name': 'test1.jdbc_table'}]

def test_partitioned_table(self):
"""
Expand Down Expand Up @@ -118,7 +118,7 @@ def test_partitioned_table(self):
{'DatabaseName': 'test1'},
)
with self.stubber:
assert query_runner.get_schema() == [{'columns': ['sk', 'category'], 'name': 'test1.partitioned_table'}]
assert query_runner.get_schema() == [{'columns': [[{'extra_info':'partition key','column_name':'category'},{'column_type':'int','column_name':'sk'}]], 'name': 'test1.partitioned_table'}]

def test_view(self):
query_runner = Athena({'glue': True, 'region': 'mars-east-1'})
Expand Down Expand Up @@ -150,7 +150,7 @@ def test_view(self):
{'DatabaseName': 'test1'},
)
with self.stubber:
assert query_runner.get_schema() == [{'columns': ['sk'], 'name': 'test1.view'}]
assert query_runner.get_schema() == [{'columns': [[{'column_name':'sk', 'column_type':'int'}]], 'name': 'test1.view'}]

def test_dodgy_table_does_not_break_schema_listing(self):
"""
Expand Down Expand Up @@ -187,4 +187,4 @@ def test_dodgy_table_does_not_break_schema_listing(self):
{'DatabaseName': 'test1'},
)
with self.stubber:
assert query_runner.get_schema() == [{'columns': ['region'], 'name': 'test1.csv'}]
assert query_runner.get_schema() == [{'columns': [[{'column_name':'region','column_type':'string'}]], 'name': 'test1.csv'}]