diff --git a/query_runner/elasticsearch.py b/query_runner/elasticsearch.py index 11a3891e18..7e618ae622 100644 --- a/query_runner/elasticsearch.py +++ b/query_runner/elasticsearch.py @@ -17,27 +17,27 @@ logger = logging.getLogger(__name__) ELASTICSEARCH_TYPES_MAPPING = { - "integer" : TYPE_INTEGER, - "long" : TYPE_INTEGER, - "float" : TYPE_FLOAT, - "double" : TYPE_FLOAT, - "boolean" : TYPE_BOOLEAN, - "string" : TYPE_STRING, - "date" : TYPE_DATE, - "object" : TYPE_STRING, + "integer": TYPE_INTEGER, + "long": TYPE_INTEGER, + "float": TYPE_FLOAT, + "double": TYPE_FLOAT, + "boolean": TYPE_BOOLEAN, + "string": TYPE_STRING, + "date": TYPE_DATE, + "object": TYPE_STRING, # "geo_point" TODO: Need to split to 2 fields somehow } ELASTICSEARCH_BUILTIN_FIELDS_MAPPING = { - "_id" : "Id", - "_score" : "Score" + "_id": "Id", + "_score": "Score" } PYTHON_TYPES_MAPPING = { str: TYPE_STRING, unicode: TYPE_STRING, - bool : TYPE_BOOLEAN, - int : TYPE_INTEGER, + bool: TYPE_BOOLEAN, + int: TYPE_INTEGER, long: TYPE_INTEGER, float: TYPE_FLOAT } @@ -101,25 +101,37 @@ def __init__(self, configuration): def _get_mappings(self, url): mappings = {} + error = None - r = requests.get(url, auth=self.auth) - mappings_data = r.json() - - for index_name in mappings_data: - index_mappings = mappings_data[index_name] - for m in index_mappings.get("mappings", {}): - for property_name in index_mappings["mappings"][m]["properties"]: - property_data = index_mappings["mappings"][m]["properties"][property_name] - if not property_name in mappings: - property_type = property_data.get("type", None) - if property_type: - if property_type in ELASTICSEARCH_TYPES_MAPPING: - mappings[property_name] = ELASTICSEARCH_TYPES_MAPPING[property_type] - else: - mappings[property_name] = TYPE_STRING - #raise Exception("Unknown property type: {0}".format(property_type)) - - return mappings + try: + r = requests.get(url, auth=self.auth) + r.raise_for_status() + + mappings_data = r.json() + + for index_name in mappings_data: + index_mappings = mappings_data[index_name] + for m in index_mappings.get("mappings", {}): + for property_name in index_mappings["mappings"][m]["properties"]: + property_data = index_mappings["mappings"][m]["properties"][property_name] + if property_name not in mappings: + property_type = property_data.get("type", None) + if property_type: + if property_type in ELASTICSEARCH_TYPES_MAPPING: + mappings[property_name] = ELASTICSEARCH_TYPES_MAPPING[property_type] + else: + mappings[property_name] = TYPE_STRING + #raise Exception("Unknown property type: {0}".format(property_type)) + except requests.HTTPError as e: + logger.exception(e) + error = "Failed to execute query. Return Code: {0} Reason: {1}".format(r.status_code, r.text) + mappings = None + except requests.exceptions.RequestException as e: + logger.exception(e) + error = "Connection refused" + mappings = None + + return mappings, error def _parse_results(self, mappings, result_fields, raw_result, result_columns, result_rows): def add_column_if_needed(mappings, column_name, friendly_name, result_columns, result_columns_index): @@ -145,9 +157,7 @@ def collect_value(mappings, row, key, value, type): row[key] = value def collect_aggregations(mappings, rows, parent_key, data, row, result_columns, result_columns_index): - if isinstance(data, dict): - for key, value in data.iteritems(): val = collect_aggregations(mappings, rows, parent_key if key == 'buckets' else key, value, row, result_columns, result_columns_index) if val: @@ -164,7 +174,6 @@ def collect_aggregations(mappings, rows, parent_key, data, row, result_columns, return data[data_key] elif isinstance(data, list): - for value in data: result_row = get_row(rows, row) collect_aggregations(mappings, rows, parent_key, value, result_row, result_columns, result_columns_index) @@ -176,7 +185,7 @@ def collect_aggregations(mappings, rows, parent_key, data, row, result_columns, return None - result_columns_index = {c["name"] : c for c in result_columns} + result_columns_index = {c["name"]: c for c in result_columns} result_fields_index = {} if result_fields: @@ -184,15 +193,12 @@ def collect_aggregations(mappings, rows, parent_key, data, row, result_columns, result_fields_index[r] = None if 'error' in raw_result: - error = raw_result['error'] if len(error) > 10240: error = error[:10240] + '... continues' raise Exception(error) - elif 'aggregations' in raw_result: - if result_fields: for field in result_fields: add_column_if_needed(mappings, field, field, result_columns, result_columns_index) @@ -202,9 +208,7 @@ def collect_aggregations(mappings, rows, parent_key, data, row, result_columns, logger.debug("result_rows %s", str(result_rows)) logger.debug("result_columns %s", str(result_columns)) - elif 'hits' in raw_result and 'hits' in raw_result['hits']: - if result_fields: for field in result_fields: add_column_if_needed(mappings, field, field, result_columns, result_columns_index) @@ -224,14 +228,18 @@ def collect_aggregations(mappings, rows, parent_key, data, row, result_columns, result_rows.append(row) else: - raise Exception("Redash failed to parse the results it got from ElasticSearch.") def test_connection(self): - r = requests.get("{0}/_cluster/health".format(self.server_url), auth=self.auth) - if r.status_code != 200: - raise Exception("Connection test failed.. Return Code: {0}" - " Reason: {1}".format(r.status_code, r.text)) + try: + r = requests.get("{0}/_cluster/health".format(self.server_url), auth=self.auth) + r.raise_for_status() + except requests.HTTPError as e: + logger.exception(e) + error = "Failed to execute query. Return Code: {0} Reason: {1}".format(r.status_code, r.text) + except requests.exceptions.RequestException as e: + logger.exception(e) + error = "Connection refused" class Kibana(BaseElasticSearch): @@ -249,8 +257,7 @@ def annotate_query(cls): def _execute_simple_query(self, url, auth, _from, mappings, result_fields, result_columns, result_rows): url += "&from={0}".format(_from) r = requests.get(url, auth=self.auth) - if r.status_code != 200: - raise Exception("Failed to execute query. Return Code: {0} Reason: {1}".format(r.status_code, r.text)) + r.raise_for_status() raw_result = r.json() @@ -283,9 +290,10 @@ def run_query(self, query, user): url = "{0}/{1}/_search?".format(self.server_url, index_name) mapping_url = "{0}/{1}/_mapping".format(self.server_url, index_name) - mappings = self._get_mappings(mapping_url) - - logger.debug(json.dumps(mappings, indent=4)) + mappings, error = self._get_mappings(mapping_url) + if error: + return None, error + #logger.debug(json.dumps(mappings, indent=4)) if sort: url += "&sort={0}".format(urllib.quote_plus(sort)) @@ -310,13 +318,22 @@ def run_query(self, query, user): raise Exception("Advanced queries are not supported") json_data = json.dumps({ - "columns" : result_columns, - "rows" : result_rows + "columns": result_columns, + "rows": result_rows }) except KeyboardInterrupt: error = "Query cancelled by user." json_data = None + except requests.HTTPError as e: + logger.exception(e) + error = "Failed to execute query. Return Code: {0} Reason: {1}".format(r.status_code, r.text) + json_data = None + except requests.exceptions.RequestException as e: + logger.exception(e) + error = "Connection refused" + json_data = None except Exception as e: + logger.exception(e) raise sys.exc_info()[1], None, sys.exc_info()[2] return json_data, error @@ -353,11 +370,14 @@ def run_query(self, query, user): mapping_url = "{0}/{1}/_mapping".format(self.server_url, index_name) mappings = self._get_mappings(mapping_url) + if error: + return None, error params = {"source": json.dumps(query_dict)} logger.debug("Using URL: %s", url) logger.debug("Using params : %s", params) r = requests.get(url, params=params, auth=self.auth) + r.raise_for_status() logger.debug("Result: %s", r.json()) result_columns = [] @@ -365,13 +385,23 @@ def run_query(self, query, user): self._parse_results(mappings, result_fields, r.json(), result_columns, result_rows) json_data = json.dumps({ - "columns" : result_columns, - "rows" : result_rows + "columns": result_columns, + "rows": result_rows }) except KeyboardInterrupt: + logger.exception(e) error = "Query cancelled by user." json_data = None + except requests.HTTPError as e: + logger.exception(e) + error = "Failed to execute query. Return Code: {0} Reason: {1}".format(r.status_code, r.text) + json_data = None + except requests.exceptions.RequestException as e: + logger.exception(e) + error = "Connection refused" + json_data = None except Exception as e: + logger.exception(e) raise sys.exc_info()[1], None, sys.exc_info()[2] return json_data, error