Skip to content

Commit

Permalink
Merge pull request getredash#1402 from someones/es_error_propagation
Browse files Browse the repository at this point in the history
Change: correctly propagate ElasticSearch errors to the UI
  • Loading branch information
arikfr authored Nov 20, 2016
2 parents ac60e07 + b0c45c7 commit 52a5a97
Showing 1 changed file with 83 additions and 53 deletions.
136 changes: 83 additions & 53 deletions query_runner/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,27 @@
logger = logging.getLogger(__name__)

ELASTICSEARCH_TYPES_MAPPING = {
"integer" : TYPE_INTEGER,
"long" : TYPE_INTEGER,
"float" : TYPE_FLOAT,
"double" : TYPE_FLOAT,
"boolean" : TYPE_BOOLEAN,
"string" : TYPE_STRING,
"date" : TYPE_DATE,
"object" : TYPE_STRING,
"integer": TYPE_INTEGER,
"long": TYPE_INTEGER,
"float": TYPE_FLOAT,
"double": TYPE_FLOAT,
"boolean": TYPE_BOOLEAN,
"string": TYPE_STRING,
"date": TYPE_DATE,
"object": TYPE_STRING,
# "geo_point" TODO: Need to split to 2 fields somehow
}

ELASTICSEARCH_BUILTIN_FIELDS_MAPPING = {
"_id" : "Id",
"_score" : "Score"
"_id": "Id",
"_score": "Score"
}

PYTHON_TYPES_MAPPING = {
str: TYPE_STRING,
unicode: TYPE_STRING,
bool : TYPE_BOOLEAN,
int : TYPE_INTEGER,
bool: TYPE_BOOLEAN,
int: TYPE_INTEGER,
long: TYPE_INTEGER,
float: TYPE_FLOAT
}
Expand Down Expand Up @@ -101,25 +101,37 @@ def __init__(self, configuration):

def _get_mappings(self, url):
mappings = {}
error = None

r = requests.get(url, auth=self.auth)
mappings_data = r.json()

for index_name in mappings_data:
index_mappings = mappings_data[index_name]
for m in index_mappings.get("mappings", {}):
for property_name in index_mappings["mappings"][m]["properties"]:
property_data = index_mappings["mappings"][m]["properties"][property_name]
if not property_name in mappings:
property_type = property_data.get("type", None)
if property_type:
if property_type in ELASTICSEARCH_TYPES_MAPPING:
mappings[property_name] = ELASTICSEARCH_TYPES_MAPPING[property_type]
else:
mappings[property_name] = TYPE_STRING
#raise Exception("Unknown property type: {0}".format(property_type))

return mappings
try:
r = requests.get(url, auth=self.auth)
r.raise_for_status()

mappings_data = r.json()

for index_name in mappings_data:
index_mappings = mappings_data[index_name]
for m in index_mappings.get("mappings", {}):
for property_name in index_mappings["mappings"][m]["properties"]:
property_data = index_mappings["mappings"][m]["properties"][property_name]
if property_name not in mappings:
property_type = property_data.get("type", None)
if property_type:
if property_type in ELASTICSEARCH_TYPES_MAPPING:
mappings[property_name] = ELASTICSEARCH_TYPES_MAPPING[property_type]
else:
mappings[property_name] = TYPE_STRING
#raise Exception("Unknown property type: {0}".format(property_type))
except requests.HTTPError as e:
logger.exception(e)
error = "Failed to execute query. Return Code: {0} Reason: {1}".format(r.status_code, r.text)
mappings = None
except requests.exceptions.RequestException as e:
logger.exception(e)
error = "Connection refused"
mappings = None

return mappings, error

def _parse_results(self, mappings, result_fields, raw_result, result_columns, result_rows):
def add_column_if_needed(mappings, column_name, friendly_name, result_columns, result_columns_index):
Expand All @@ -145,9 +157,7 @@ def collect_value(mappings, row, key, value, type):
row[key] = value

def collect_aggregations(mappings, rows, parent_key, data, row, result_columns, result_columns_index):

if isinstance(data, dict):

for key, value in data.iteritems():
val = collect_aggregations(mappings, rows, parent_key if key == 'buckets' else key, value, row, result_columns, result_columns_index)
if val:
Expand All @@ -164,7 +174,6 @@ def collect_aggregations(mappings, rows, parent_key, data, row, result_columns,
return data[data_key]

elif isinstance(data, list):

for value in data:
result_row = get_row(rows, row)
collect_aggregations(mappings, rows, parent_key, value, result_row, result_columns, result_columns_index)
Expand All @@ -176,23 +185,20 @@ def collect_aggregations(mappings, rows, parent_key, data, row, result_columns,

return None

result_columns_index = {c["name"] : c for c in result_columns}
result_columns_index = {c["name"]: c for c in result_columns}

result_fields_index = {}
if result_fields:
for r in result_fields:
result_fields_index[r] = None

if 'error' in raw_result:

error = raw_result['error']
if len(error) > 10240:
error = error[:10240] + '... continues'

raise Exception(error)

elif 'aggregations' in raw_result:

if result_fields:
for field in result_fields:
add_column_if_needed(mappings, field, field, result_columns, result_columns_index)
Expand All @@ -202,9 +208,7 @@ def collect_aggregations(mappings, rows, parent_key, data, row, result_columns,

logger.debug("result_rows %s", str(result_rows))
logger.debug("result_columns %s", str(result_columns))

elif 'hits' in raw_result and 'hits' in raw_result['hits']:

if result_fields:
for field in result_fields:
add_column_if_needed(mappings, field, field, result_columns, result_columns_index)
Expand All @@ -224,14 +228,18 @@ def collect_aggregations(mappings, rows, parent_key, data, row, result_columns,

result_rows.append(row)
else:

raise Exception("Redash failed to parse the results it got from ElasticSearch.")

def test_connection(self):
r = requests.get("{0}/_cluster/health".format(self.server_url), auth=self.auth)
if r.status_code != 200:
raise Exception("Connection test failed.. Return Code: {0}"
" Reason: {1}".format(r.status_code, r.text))
try:
r = requests.get("{0}/_cluster/health".format(self.server_url), auth=self.auth)
r.raise_for_status()
except requests.HTTPError as e:
logger.exception(e)
error = "Failed to execute query. Return Code: {0} Reason: {1}".format(r.status_code, r.text)
except requests.exceptions.RequestException as e:
logger.exception(e)
error = "Connection refused"


class Kibana(BaseElasticSearch):
Expand All @@ -249,8 +257,7 @@ def annotate_query(cls):
def _execute_simple_query(self, url, auth, _from, mappings, result_fields, result_columns, result_rows):
url += "&from={0}".format(_from)
r = requests.get(url, auth=self.auth)
if r.status_code != 200:
raise Exception("Failed to execute query. Return Code: {0} Reason: {1}".format(r.status_code, r.text))
r.raise_for_status()

raw_result = r.json()

Expand Down Expand Up @@ -283,9 +290,10 @@ def run_query(self, query, user):
url = "{0}/{1}/_search?".format(self.server_url, index_name)
mapping_url = "{0}/{1}/_mapping".format(self.server_url, index_name)

mappings = self._get_mappings(mapping_url)

logger.debug(json.dumps(mappings, indent=4))
mappings, error = self._get_mappings(mapping_url)
if error:
return None, error
#logger.debug(json.dumps(mappings, indent=4))

if sort:
url += "&sort={0}".format(urllib.quote_plus(sort))
Expand All @@ -310,13 +318,22 @@ def run_query(self, query, user):
raise Exception("Advanced queries are not supported")

json_data = json.dumps({
"columns" : result_columns,
"rows" : result_rows
"columns": result_columns,
"rows": result_rows
})
except KeyboardInterrupt:
error = "Query cancelled by user."
json_data = None
except requests.HTTPError as e:
logger.exception(e)
error = "Failed to execute query. Return Code: {0} Reason: {1}".format(r.status_code, r.text)
json_data = None
except requests.exceptions.RequestException as e:
logger.exception(e)
error = "Connection refused"
json_data = None
except Exception as e:
logger.exception(e)
raise sys.exc_info()[1], None, sys.exc_info()[2]

return json_data, error
Expand Down Expand Up @@ -353,25 +370,38 @@ def run_query(self, query, user):
mapping_url = "{0}/{1}/_mapping".format(self.server_url, index_name)

mappings = self._get_mappings(mapping_url)
if error:
return None, error

params = {"source": json.dumps(query_dict)}
logger.debug("Using URL: %s", url)
logger.debug("Using params : %s", params)
r = requests.get(url, params=params, auth=self.auth)
r.raise_for_status()
logger.debug("Result: %s", r.json())

result_columns = []
result_rows = []
self._parse_results(mappings, result_fields, r.json(), result_columns, result_rows)

json_data = json.dumps({
"columns" : result_columns,
"rows" : result_rows
"columns": result_columns,
"rows": result_rows
})
except KeyboardInterrupt:
logger.exception(e)
error = "Query cancelled by user."
json_data = None
except requests.HTTPError as e:
logger.exception(e)
error = "Failed to execute query. Return Code: {0} Reason: {1}".format(r.status_code, r.text)
json_data = None
except requests.exceptions.RequestException as e:
logger.exception(e)
error = "Connection refused"
json_data = None
except Exception as e:
logger.exception(e)
raise sys.exc_info()[1], None, sys.exc_info()[2]

return json_data, error
Expand Down

0 comments on commit 52a5a97

Please sign in to comment.