diff --git a/redash/query_runner/qubole.py b/redash/query_runner/qubole.py index d62260cd5f..d261f4fcff 100644 --- a/redash/query_runner/qubole.py +++ b/redash/query_runner/qubole.py @@ -11,7 +11,8 @@ try: import qds_sdk from qds_sdk.qubole import Qubole as qbol - from qds_sdk.commands import Command, HiveCommand, PrestoCommand + from qds_sdk.commands import Command, HiveCommand + from qds_sdk.commands import SqlCommand, PrestoCommand enabled = True except ImportError: enabled = False @@ -24,6 +25,11 @@ def configuration_schema(cls): return { "type": "object", "properties": { + "query_type": { + "type": "string", + "title": "Query Type (quantum / presto / hive)", + "default": "hive" + }, "endpoint": { "type": "string", "title": "API Endpoint", @@ -37,18 +43,21 @@ def configuration_schema(cls): "type": "string", "title": "Cluster Label", "default": "default" - }, - "query_type": { - "type": "string", - "title": "Query Type (hive or presto)", - "default": "hive" } }, - "order": ["endpoint", "token", "cluster"], - "required": ["endpoint", "token", "cluster"], + "order": ["query_type", "endpoint", "token", "cluster"], + "required": ["endpoint", "token"], "secret": ["token"] } + @classmethod + def type(cls): + return "qubole" + + @classmethod + def name(cls): + return "Qubole" + @classmethod def enabled(cls): return enabled @@ -59,16 +68,26 @@ def annotate_query(cls): def test_connection(self): headers = self._get_header() - r = requests.head("%s/api/latest/users" % self.configuration['endpoint'], headers=headers) + r = requests.head("%s/api/latest/users" % self.configuration.get('endpoint'), headers=headers) r.status_code == 200 def run_query(self, query, user): - qbol.configure(api_token=self.configuration['token'], - api_url='%s/api' % self.configuration['endpoint']) + qbol.configure(api_token=self.configuration.get('token'), + api_url='%s/api' % self.configuration.get('endpoint')) try: - cls = PrestoCommand if(self.configuration['query_type'] == 'presto') else HiveCommand - cmd = cls.create(query=query, label=self.configuration['cluster']) + query_type = self.configuration.get('query_type', 'hive') + + if query_type == 'quantum': + cmd = SqlCommand.create(query=query) + elif query_type == 'hive': + cmd = HiveCommand.create(query=query, label=self.configuration.get('cluster')) + elif query_type == 'presto': + cmd = PrestoCommand.create(query=query, label=self.configuration.get('cluster')) + else: + raise Exception("Invalid Query Type:%s.\ + It must be : hive / presto / quantum." % self.configuration.get('query_type')) + logging.info("Qubole command created with Id: %s and Status: %s", cmd.id, cmd.status) while not Command.is_done(cmd.status): @@ -106,7 +125,7 @@ def get_schema(self, get_stats=False): try: headers = self._get_header() content = requests.get("%s/api/latest/hive?describe=true&per_page=10000" % - self.configuration['endpoint'], headers=headers) + self.configuration.get('endpoint'), headers=headers) data = content.json() for schema in data['schemas']: @@ -127,7 +146,7 @@ def get_schema(self, get_stats=False): def _get_header(self): return {"Content-type": "application/json", "Accept": "application/json", - "X-AUTH-TOKEN": self.configuration['token']} + "X-AUTH-TOKEN": self.configuration.get('token')} register(Qubole)