Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change: correctly propagate ElasticSearch errors to the UI #1402

Merged
merged 2 commits into from Nov 20, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 83 additions & 53 deletions redash/query_runner/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,27 @@
logger = logging.getLogger(__name__)

ELASTICSEARCH_TYPES_MAPPING = {
"integer" : TYPE_INTEGER,
"long" : TYPE_INTEGER,
"float" : TYPE_FLOAT,
"double" : TYPE_FLOAT,
"boolean" : TYPE_BOOLEAN,
"string" : TYPE_STRING,
"date" : TYPE_DATE,
"object" : TYPE_STRING,
"integer": TYPE_INTEGER,
"long": TYPE_INTEGER,
"float": TYPE_FLOAT,
"double": TYPE_FLOAT,
"boolean": TYPE_BOOLEAN,
"string": TYPE_STRING,
"date": TYPE_DATE,
"object": TYPE_STRING,
# "geo_point" TODO: Need to split to 2 fields somehow
}

ELASTICSEARCH_BUILTIN_FIELDS_MAPPING = {
"_id" : "Id",
"_score" : "Score"
"_id": "Id",
"_score": "Score"
}

PYTHON_TYPES_MAPPING = {
str: TYPE_STRING,
unicode: TYPE_STRING,
bool : TYPE_BOOLEAN,
int : TYPE_INTEGER,
bool: TYPE_BOOLEAN,
int: TYPE_INTEGER,
long: TYPE_INTEGER,
float: TYPE_FLOAT
}
Expand Down Expand Up @@ -101,25 +101,37 @@ def __init__(self, configuration):

def _get_mappings(self, url):
mappings = {}
error = None

r = requests.get(url, auth=self.auth)
mappings_data = r.json()

for index_name in mappings_data:
index_mappings = mappings_data[index_name]
for m in index_mappings.get("mappings", {}):
for property_name in index_mappings["mappings"][m]["properties"]:
property_data = index_mappings["mappings"][m]["properties"][property_name]
if not property_name in mappings:
property_type = property_data.get("type", None)
if property_type:
if property_type in ELASTICSEARCH_TYPES_MAPPING:
mappings[property_name] = ELASTICSEARCH_TYPES_MAPPING[property_type]
else:
mappings[property_name] = TYPE_STRING
#raise Exception("Unknown property type: {0}".format(property_type))

return mappings
try:
r = requests.get(url, auth=self.auth)
r.raise_for_status()

mappings_data = r.json()

for index_name in mappings_data:
index_mappings = mappings_data[index_name]
for m in index_mappings.get("mappings", {}):
for property_name in index_mappings["mappings"][m]["properties"]:
property_data = index_mappings["mappings"][m]["properties"][property_name]
if property_name not in mappings:
property_type = property_data.get("type", None)
if property_type:
if property_type in ELASTICSEARCH_TYPES_MAPPING:
mappings[property_name] = ELASTICSEARCH_TYPES_MAPPING[property_type]
else:
mappings[property_name] = TYPE_STRING
#raise Exception("Unknown property type: {0}".format(property_type))
except requests.HTTPError as e:
logger.exception(e)
error = "Failed to execute query. Return Code: {0} Reason: {1}".format(r.status_code, r.text)
mappings = None
except requests.exceptions.RequestException as e:
logger.exception(e)
error = "Connection refused"
mappings = None

return mappings, error

def _parse_results(self, mappings, result_fields, raw_result, result_columns, result_rows):
def add_column_if_needed(mappings, column_name, friendly_name, result_columns, result_columns_index):
Expand All @@ -145,9 +157,7 @@ def collect_value(mappings, row, key, value, type):
row[key] = value

def collect_aggregations(mappings, rows, parent_key, data, row, result_columns, result_columns_index):

if isinstance(data, dict):

for key, value in data.iteritems():
val = collect_aggregations(mappings, rows, parent_key if key == 'buckets' else key, value, row, result_columns, result_columns_index)
if val:
Expand All @@ -164,7 +174,6 @@ def collect_aggregations(mappings, rows, parent_key, data, row, result_columns,
return data[data_key]

elif isinstance(data, list):

for value in data:
result_row = get_row(rows, row)
collect_aggregations(mappings, rows, parent_key, value, result_row, result_columns, result_columns_index)
Expand All @@ -176,23 +185,20 @@ def collect_aggregations(mappings, rows, parent_key, data, row, result_columns,

return None

result_columns_index = {c["name"] : c for c in result_columns}
result_columns_index = {c["name"]: c for c in result_columns}

result_fields_index = {}
if result_fields:
for r in result_fields:
result_fields_index[r] = None

if 'error' in raw_result:

error = raw_result['error']
if len(error) > 10240:
error = error[:10240] + '... continues'

raise Exception(error)

elif 'aggregations' in raw_result:

if result_fields:
for field in result_fields:
add_column_if_needed(mappings, field, field, result_columns, result_columns_index)
Expand All @@ -202,9 +208,7 @@ def collect_aggregations(mappings, rows, parent_key, data, row, result_columns,

logger.debug("result_rows %s", str(result_rows))
logger.debug("result_columns %s", str(result_columns))

elif 'hits' in raw_result and 'hits' in raw_result['hits']:

if result_fields:
for field in result_fields:
add_column_if_needed(mappings, field, field, result_columns, result_columns_index)
Expand All @@ -224,14 +228,18 @@ def collect_aggregations(mappings, rows, parent_key, data, row, result_columns,

result_rows.append(row)
else:

raise Exception("Redash failed to parse the results it got from ElasticSearch.")

def test_connection(self):
r = requests.get("{0}/_cluster/health".format(self.server_url), auth=self.auth)
if r.status_code != 200:
raise Exception("Connection test failed.. Return Code: {0}"
" Reason: {1}".format(r.status_code, r.text))
try:
r = requests.get("{0}/_cluster/health".format(self.server_url), auth=self.auth)
r.raise_for_status()
except requests.HTTPError as e:
logger.exception(e)
error = "Failed to execute query. Return Code: {0} Reason: {1}".format(r.status_code, r.text)
except requests.exceptions.RequestException as e:
logger.exception(e)
error = "Connection refused"


class Kibana(BaseElasticSearch):
Expand All @@ -249,8 +257,7 @@ def annotate_query(cls):
def _execute_simple_query(self, url, auth, _from, mappings, result_fields, result_columns, result_rows):
url += "&from={0}".format(_from)
r = requests.get(url, auth=self.auth)
if r.status_code != 200:
raise Exception("Failed to execute query. Return Code: {0} Reason: {1}".format(r.status_code, r.text))
r.raise_for_status()

raw_result = r.json()

Expand Down Expand Up @@ -283,9 +290,10 @@ def run_query(self, query, user):
url = "{0}/{1}/_search?".format(self.server_url, index_name)
mapping_url = "{0}/{1}/_mapping".format(self.server_url, index_name)

mappings = self._get_mappings(mapping_url)

logger.debug(json.dumps(mappings, indent=4))
mappings, error = self._get_mappings(mapping_url)
if error:
return None, error
#logger.debug(json.dumps(mappings, indent=4))

if sort:
url += "&sort={0}".format(urllib.quote_plus(sort))
Expand All @@ -310,13 +318,22 @@ def run_query(self, query, user):
raise Exception("Advanced queries are not supported")

json_data = json.dumps({
"columns" : result_columns,
"rows" : result_rows
"columns": result_columns,
"rows": result_rows
})
except KeyboardInterrupt:
error = "Query cancelled by user."
json_data = None
except requests.HTTPError as e:
logger.exception(e)
error = "Failed to execute query. Return Code: {0} Reason: {1}".format(r.status_code, r.text)
json_data = None
except requests.exceptions.RequestException as e:
logger.exception(e)
error = "Connection refused"
json_data = None
except Exception as e:
logger.exception(e)
raise sys.exc_info()[1], None, sys.exc_info()[2]

return json_data, error
Expand Down Expand Up @@ -353,25 +370,38 @@ def run_query(self, query, user):
mapping_url = "{0}/{1}/_mapping".format(self.server_url, index_name)

mappings = self._get_mappings(mapping_url)
if error:
return None, error

params = {"source": json.dumps(query_dict)}
logger.debug("Using URL: %s", url)
logger.debug("Using params : %s", params)
r = requests.get(url, params=params, auth=self.auth)
r.raise_for_status()
logger.debug("Result: %s", r.json())

result_columns = []
result_rows = []
self._parse_results(mappings, result_fields, r.json(), result_columns, result_rows)

json_data = json.dumps({
"columns" : result_columns,
"rows" : result_rows
"columns": result_columns,
"rows": result_rows
})
except KeyboardInterrupt:
logger.exception(e)
error = "Query cancelled by user."
json_data = None
except requests.HTTPError as e:
logger.exception(e)
error = "Failed to execute query. Return Code: {0} Reason: {1}".format(r.status_code, r.text)
json_data = None
except requests.exceptions.RequestException as e:
logger.exception(e)
error = "Connection refused"
json_data = None
except Exception as e:
logger.exception(e)
raise sys.exc_info()[1], None, sys.exc_info()[2]

return json_data, error
Expand Down