diff --git a/redash/query_runner/cass.py b/redash/query_runner/cass.py new file mode 100644 index 0000000000..73afc539f4 --- /dev/null +++ b/redash/query_runner/cass.py @@ -0,0 +1,105 @@ +import json +import sys +import logging + +from redash.query_runner import * +from redash.utils import JSONEncoder + +logger = logging.getLogger(__name__) + +try: + from cassandra.cluster import Cluster + enabled = True +except ImportError: + enabled = False + +class Cassandra(BaseQueryRunner): + @classmethod + def enabled(cls): + return enabled + + @classmethod + def configuration_schema(cls): + return { + 'type': 'object', + 'properties': { + 'host': { + 'type': 'string', + }, + 'port': { + 'type': 'number', + 'default': 9042, + }, + 'keyspace': { + 'type': 'string', + 'title': 'Keyspace name' + }, + 'username': { + 'type': 'string', + 'title': 'Username' + }, + 'password': { + 'type': 'string', + 'title': 'Password' + } + }, + 'required': ['keyspace', 'host'] + } + + @classmethod + def type(cls): + return "Cassandra" + + def _get_tables(self, schema): + query = """ + select columnfamily_name from system.schema_columnfamilies where keyspace_name = '{}'; + """.format(self.configuration['keyspace']) + + results = self.run_query(query) + return results, error + + def run_query(self, query): + from cassandra.cluster import Cluster + connection = None + try: + if self.configuration.get('username', '') and self.configuration.get('password', ''): + from cassandra.auth import PlainTextAuthProvider + auth_provider = PlainTextAuthProvider(username='{}'.format(self.configuration.get('username', '')), + password='{}'.format(self.configuration.get('password', ''))) + connection = Cluster([self.configuration.get('host', '')], auth_provider=auth_provider) + else: + connection = Cluster([self.configuration.get('host', '')]) + + session = connection.connect() + logger.debug("Cassandra running query: %s", query) + result = session.execute(query) + + column_names = result.column_names + + columns = self.fetch_columns(map(lambda c: (c, 'string'), column_names)) + + rows = [dict(zip(column_names, row)) for row in result] + + data = {'columns': columns, 'rows': rows} + json_data = json.dumps(data, cls=JSONEncoder) + + error = None + + except cassandra.cluster.Error, e: + error = e.args[1] + except KeyboardInterrupt: + error = "Query cancelled by user." + + return json_data, error + +class ScyllaDB(Cassandra): + + def __init__(self, configuration): + super(ScyllaDB, self).__init__(configuration) + + @classmethod + def type(cls): + return "scylla" + +register(Cassandra) +register(ScyllaDB) diff --git a/redash/query_runner/hive_ds.py b/redash/query_runner/hive_ds.py index 4dc131216c..683240b949 100644 --- a/redash/query_runner/hive_ds.py +++ b/redash/query_runner/hive_ds.py @@ -119,7 +119,6 @@ def run_query(self, query): data = {'columns': columns, 'rows': rows} json_data = json.dumps(data, cls=JSONEncoder) error = None - cursor.close() except KeyboardInterrupt: connection.cancel() error = "Query cancelled by user." diff --git a/requirements_all_ds.txt b/requirements_all_ds.txt index 6a37763af2..674567e3b0 100644 --- a/requirements_all_ds.txt +++ b/requirements_all_ds.txt @@ -16,3 +16,4 @@ botocore==1.4.4 sasl>=0.1.3 thrift>=0.8.0 thrift_sasl>=0.1.0 +cassandra-driver==3.1.1