From 4aae9fe57bb0a2d2ea5f423a2206aac407d7cdd5 Mon Sep 17 00:00:00 2001 From: Maxime Beauchemin Date: Fri, 2 Jun 2017 10:51:23 -0700 Subject: [PATCH] Elastic search integration --- superset/config.py | 1 + superset/connectors/druid/views.py | 5 +- superset/connectors/elastic/__init__.py | 2 + superset/connectors/elastic/models.py | 583 ++++++++++++++++++ superset/connectors/elastic/views.py | 285 +++++++++ .../versions/b97e54338b27_elasticsearch.py | 112 ++++ superset/views/core.py | 1 - 7 files changed, 985 insertions(+), 4 deletions(-) create mode 100644 superset/connectors/elastic/__init__.py create mode 100644 superset/connectors/elastic/models.py create mode 100644 superset/connectors/elastic/views.py create mode 100644 superset/migrations/versions/b97e54338b27_elasticsearch.py diff --git a/superset/config.py b/superset/config.py index 6c38fa2f76b51..4c207f745789c 100644 --- a/superset/config.py +++ b/superset/config.py @@ -190,6 +190,7 @@ DEFAULT_MODULE_DS_MAP = OrderedDict([ ('superset.connectors.sqla.models', ['SqlaTable']), ('superset.connectors.druid.models', ['DruidDatasource']), + ('superset.connectors.elastic.models', ['ElasticDatasource']), ]) ADDITIONAL_MODULE_DS_MAP = {} ADDITIONAL_MIDDLEWARE = [] diff --git a/superset/connectors/druid/views.py b/superset/connectors/druid/views.py index 7ae3fb37096aa..34cdc357ed89d 100644 --- a/superset/connectors/druid/views.py +++ b/superset/connectors/druid/views.py @@ -21,6 +21,8 @@ from . import models +appbuilder.add_separator("Sources", ) + class DruidColumnInlineView(CompactCRUDMixin, SupersetModelView): # noqa datamodel = SQLAInterface(models.DruidColumn) @@ -283,6 +285,3 @@ def refresh_datasources(self): category_label=__("Sources"), category_icon='fa-database', icon="fa-cog") - - -appbuilder.add_separator("Sources", ) diff --git a/superset/connectors/elastic/__init__.py b/superset/connectors/elastic/__init__.py new file mode 100644 index 0000000000000..b2df79851f224 --- /dev/null +++ b/superset/connectors/elastic/__init__.py @@ -0,0 +1,2 @@ +from . import models # noqa +from . import views # noqa diff --git a/superset/connectors/elastic/models.py b/superset/connectors/elastic/models.py new file mode 100644 index 0000000000000..609e14021129e --- /dev/null +++ b/superset/connectors/elastic/models.py @@ -0,0 +1,583 @@ +# pylint: disable=invalid-unary-operand-type +import json +import logging +from datetime import datetime, timedelta +from six import string_types + +import requests +import sqlalchemy as sa +from sqlalchemy import ( + Column, Integer, String, ForeignKey, Text, Boolean, + DateTime, +) +from sqlalchemy.orm import backref, relationship +from dateutil.parser import parse as dparse + +from flask import Markup, escape +from flask_appbuilder.models.decorators import renders +from flask_appbuilder import Model + +from flask_babel import lazy_gettext as _ + +from elasticsearch import Elasticsearch + +from superset import conf, db, import_util, utils, sm, get_session +from superset.utils import flasher +from superset.connectors.base import BaseDatasource, BaseColumn, BaseMetric +from superset.models.helpers import AuditMixinNullable, QueryResult, set_perm + + +class ElasticCluster(Model, AuditMixinNullable): + + """ORM object referencing the Elastic clusters""" + + __tablename__ = 'elastic_clusters' + type = "elastic" + + id = Column(Integer, primary_key=True) + cluster_name = Column(String(250), unique=True) + hosts_json = Column(Text) + metadata_last_refreshed = Column(DateTime) + cache_timeout = Column(Integer) + + def __repr__(self): + return self.cluster_name + + @property + def hosts(self): + return json.loads(self.hosts_json) + + def get_client(self): + return Elasticsearch(self.hosts) + + def get_mappings(self): + client = self.get_client() + return client.indices.get_mapping() + + def refresh_datasources(self, datasource_name=None, merge_flag=False): + """Refresh metadata of all datasources in the cluster + If ``datasource_name`` is specified, only that datasource is updated + """ + for index_name, index_metadata in self.get_mappings().items(): + for name, mapping_metadata in index_metadata.get('mappings').items(): + ElasticDatasource.sync_to_db( + '{}.{}'.format(index_name, name), mapping_metadata, self) + + @property + def perm(self): + return "[{obj.cluster_name}].(id:{obj.id})".format(obj=self) + + def get_perm(self): + return self.perm + + @property + def name(self): + return self.verbose_name if self.verbose_name else self.cluster_name + + @property + def unique_name(self): + return self.verbose_name if self.verbose_name else self.cluster_name + + +class ElasticColumn(Model, BaseColumn): + """ORM model for storing Elastic datasource column metadata""" + + __tablename__ = 'elastic_columns' + + datasource_name = Column( + String(255), + ForeignKey('elastic_datasources.datasource_name')) + # Setting enable_typechecks=False disables polymorphic inheritance. + datasource = relationship( + 'ElasticDatasource', + backref=backref('columns', cascade='all, delete-orphan'), + enable_typechecks=False) + json = Column(Text) + + export_fields = ( + 'datasource_name', 'column_name', 'is_active', 'type', 'groupby', + 'count_distinct', 'sum', 'avg', 'max', 'min', 'filterable', + 'description', 'dimension_spec_json' + ) + + @property + def expression(self): + return self.json + + def __repr__(self): + return self.column_name + + @property + def dimension_spec(self): + if self.dimension_spec_json: + return json.loads(self.dimension_spec_json) + + def generate_metrics(self): + """Generate metrics based on the column metadata""" + M = ElasticMetric # noqa + metrics = [] + metrics.append(ElasticMetric( + metric_name='count', + verbose_name='COUNT(*)', + metric_type='count', + json=json.dumps({'type': 'count', 'name': 'count'}) + )) + if self.sum and self.is_num: + mt = self.type.lower() + 'Sum' + name = 'sum__' + self.column_name + metrics.append(ElasticMetric( + metric_name=name, + metric_type='sum', + verbose_name='SUM({})'.format(self.column_name), + json=json.dumps({'sum': {'field': self.column_name}}) + )) + + if self.avg and self.is_num: + mt = self.type.lower() + 'Avg' + name = 'avg__' + self.column_name + metrics.append(ElasticMetric( + metric_name=name, + metric_type='avg', + verbose_name='AVG({})'.format(self.column_name), + json=json.dumps({'avg': {'field': self.column_name}}) + )) + + if self.min and self.is_num: + mt = self.type.lower() + 'Min' + name = 'min__' + self.column_name + metrics.append(ElasticMetric( + metric_name=name, + metric_type='min', + verbose_name='MIN({})'.format(self.column_name), + json=json.dumps({'min': {'field': self.column_name}}) + )) + if self.max and self.is_num: + mt = self.type.lower() + 'Max' + name = 'max__' + self.column_name + metrics.append(ElasticMetric( + metric_name=name, + metric_type='max', + verbose_name='MAX({})'.format(self.column_name), + json=json.dumps({'max': {'field': self.column_name}}) + )) + if self.count_distinct: + mt = 'count_distinct' + metrics.append(ElasticMetric( + metric_name=name, + verbose_name='COUNT(DISTINCT {})'.format(self.column_name), + metric_type='count_distinct', + json=json.dumps({'cardinality': {'field': self.column_name}}) + )) + session = get_session() + new_metrics = [] + for metric in metrics: + m = ( + session.query(M) + .filter(M.metric_name == metric.metric_name) + .filter(M.datasource_name == self.datasource_name) + .filter(ElasticCluster.cluster_name == self.datasource.cluster_name) + .first() + ) + metric.datasource_name = self.datasource_name + if not m: + new_metrics.append(metric) + session.add(metric) + session.flush() + + @classmethod + def import_obj(cls, i_column): + def lookup_obj(lookup_column): + return db.session.query(ElasticColumn).filter( + ElasticColumn.datasource_name == lookup_column.datasource_name, + ElasticColumn.column_name == lookup_column.column_name).first() + + return import_util.import_simple_obj(db.session, i_column, lookup_obj) + + +class ElasticMetric(Model, BaseMetric): + + """ORM object referencing Elastic metrics for a datasource""" + + __tablename__ = 'elastic_metrics' + datasource_name = Column( + String(255), + ForeignKey('elastic_datasources.datasource_name')) + # Setting enable_typechecks=False disables polymorphic inheritance. + datasource = relationship( + 'ElasticDatasource', + backref=backref('metrics', cascade='all, delete-orphan'), + enable_typechecks=False) + json = Column(Text) + + export_fields = ( + 'metric_name', 'verbose_name', 'metric_type', 'datasource_name', + 'json', 'description', 'is_restricted', 'd3format' + ) + + @property + def expression(self): + return self.json + + @property + def json_obj(self): + try: + obj = json.loads(self.json) + except Exception: + obj = {} + return obj + + @property + def perm(self): + return ( + "{parent_name}.[{obj.metric_name}](id:{obj.id})" + ).format(obj=self, + parent_name=self.datasource.full_name + ) if self.datasource else None + + @classmethod + def import_obj(cls, i_metric): + def lookup_obj(lookup_metric): + return db.session.query(ElasticMetric).filter( + ElasticMetric.datasource_name == lookup_metric.datasource_name, + ElasticMetric.metric_name == lookup_metric.metric_name).first() + return import_util.import_simple_obj(db.session, i_metric, lookup_obj) + + +class ElasticDatasource(Model, BaseDatasource): + + """ORM object referencing Elastic datasources (tables)""" + + __tablename__ = 'elastic_datasources' + + type = "elastic" + query_langtage = "json" + cluster_class = ElasticCluster + metric_class = ElasticMetric + column_class = ElasticColumn + + baselink = "elasticdatasourcemodelview" + + # Columns + datasource_name = Column(String(255), unique=True) + is_hidden = Column(Boolean, default=False) + fetch_values_from = Column(String(100)) + cluster_name = Column( + String(250), ForeignKey('elastic_clusters.cluster_name')) + cluster = relationship( + 'ElasticCluster', backref='datasources', foreign_keys=[cluster_name]) + user_id = Column(Integer, ForeignKey('ab_user.id')) + owner = relationship( + sm.user_model, + backref=backref('elastic_datasources', cascade='all, delete-orphan'), + foreign_keys=[user_id]) + + export_fields = ( + 'datasource_name', 'is_hidden', 'description', 'default_endpoint', + 'cluster_name', 'offset', 'cache_timeout', 'params' + ) + slices = relationship( + 'Slice', + primaryjoin=( + "ElasticDatasource.id == foreign(Slice.datasource_id) and " + "Slice.datasource_type == 'elastic'")) + + @property + def database(self): + return self.cluster + + @property + def num_cols(self): + return [c.column_name for c in self.columns if c.is_num] + + @property + def name(self): + return self.datasource_name + + @property + def schema(self): + ds_name = self.datasource_name or '' + name_pieces = ds_name.split('.') + if len(name_pieces) > 1: + return name_pieces[0] + else: + return None + + @property + def schema_perm(self): + """Returns schema permission if present, cluster one otherwise.""" + return utils.get_schema_perm(self.cluster, self.schema) + + def get_perm(self): + return ( + "[{obj.cluster_name}].[{obj.datasource_name}]" + "(id:{obj.id})").format(obj=self) + + @property + def link(self): + name = escape(self.datasource_name) + return Markup('{name}').format(**locals()) + + @property + def full_name(self): + return utils.get_datasource_full_name( + self.cluster_name, self.datasource_name) + + @property + def time_column_grains(self): + return { + "time_columns": [ + 'all', '5 seconds', '30 seconds', '1 minute', + '5 minutes', '1 hour', '6 hour', '1 day', '7 days', + 'week', 'week_starting_sunday', 'week_ending_saturday', + 'month', + ], + "time_grains": ['now'] + } + + def __repr__(self): + return self.datasource_name + + @renders('datasource_name') + def datasource_link(self): + url = "/superset/explore/{obj.type}/{obj.id}/".format(obj=self) + name = escape(self.datasource_name) + return Markup('{name}'.format(**locals())) + + def get_metric_obj(self, metric_name): + return [ + m.json_obj for m in self.metrics + if m.metric_name == metric_name + ][0] + + @classmethod + def import_obj(cls, i_datasource, import_time=None): + """Imports the datasource from the object to the database. + + Metrics and columns and datasource will be overridden if exists. + This function can be used to import/export dashboards between multiple + superset instances. Audit metadata isn't copies over. + """ + def lookup_datasource(d): + return db.session.query(ElasticDatasource).join(ElasticCluster).filter( + ElasticDatasource.datasource_name == d.datasource_name, + ElasticCluster.cluster_name == d.cluster_name, + ).first() + + def lookup_cluster(d): + return db.session.query(ElasticCluster).filter_by( + cluster_name=d.cluster_name).one() + return import_util.import_datasource( + db.session, i_datasource, lookup_cluster, lookup_datasource, + import_time) + + @staticmethod + def version_higher(v1, v2): + """is v1 higher than v2 + + >>> ElasticDatasource.version_higher('0.8.2', '0.9.1') + False + >>> ElasticDatasource.version_higher('0.8.2', '0.6.1') + True + >>> ElasticDatasource.version_higher('0.8.2', '0.8.2') + False + >>> ElasticDatasource.version_higher('0.8.2', '0.9.BETA') + False + >>> ElasticDatasource.version_higher('0.8.2', '0.9') + False + """ + def int_or_0(v): + try: + v = int(v) + except (TypeError, ValueError): + v = 0 + return v + v1nums = [int_or_0(n) for n in v1.split('.')] + v2nums = [int_or_0(n) for n in v2.split('.')] + v1nums = (v1nums + [0, 0, 0])[:3] + v2nums = (v2nums + [0, 0, 0])[:3] + return v1nums[0] > v2nums[0] or \ + (v1nums[0] == v2nums[0] and v1nums[1] > v2nums[1]) or \ + (v1nums[0] == v2nums[0] and v1nums[1] == v2nums[1] and v1nums[2] > v2nums[2]) + + def generate_metrics(self): + for col in self.columns: + col.generate_metrics() + + def query_str(self): + d = {"query": None} + return json.dumps(d) + + @classmethod + def sync_to_db(cls, name, metadata, cluster): + """Fetches metadata for that datasource and merges the Superset db""" + logging.info("Syncing Elastic datasource [{}]".format(name)) + session = get_session() + datasource = session.query(cls).filter_by(datasource_name=name).first() + if not datasource: + datasource = cls(datasource_name=name) + session.add(datasource) + flasher("Adding new datasource [{}]".format(name), "success") + else: + flasher("Refreshing datasource [{}]".format(name), "info") + session.flush() + datasource.cluster = cluster + session.flush() + + for col_name, col_metadata in metadata.get('properties').items(): + cls.merge_column(col_name, col_metadata, datasource, session) + + @classmethod + def merge_column(cls, col_name, col_metadata, datasource, sesh): + col_obj = ( + sesh + .query(ElasticColumn) + .filter_by( + datasource_name=datasource.datasource_name, + column_name=col_name) + .first() + ) + datatype = col_metadata.get('type') + if not col_obj: + col_obj = ElasticColumn( + datasource_name=datasource.datasource_name, + column_name=col_name) + sesh.add(col_obj) + if datatype == "string": + col_obj.groupby = True + col_obj.filterable = True + if col_obj.is_num: + col_obj.sum = True + if col_obj: + col_obj.type = datatype + sesh.flush() + col_obj.datasource = datasource + col_obj.generate_metrics() + sesh.flush() + + + @staticmethod + def time_offset(granularity): + if granularity == 'week_ending_saturday': + return 6 * 24 * 3600 * 1000 # 6 days + return 0 + + # uses https://en.wikipedia.org/wiki/ISO_8601 + # http://elastic.io/docs/0.8.0/querying/granularities.html + # TODO: pass origin from the UI + @staticmethod + def granularity(period_name, timezone=None, origin=None): + if not period_name or period_name == 'all': + return 'all' + iso_8601_dict = { + '5 seconds': 'PT5S', + '30 seconds': 'PT30S', + '1 minute': 'PT1M', + '5 minutes': 'PT5M', + '1 hour': 'PT1H', + '6 hour': 'PT6H', + 'one day': 'P1D', + '1 day': 'P1D', + '7 days': 'P7D', + 'week': 'P1W', + 'week_starting_sunday': 'P1W', + 'week_ending_saturday': 'P1W', + 'month': 'P1M', + } + + granularity = {'type': 'period'} + if timezone: + granularity['timeZone'] = timezone + + if origin: + dttm = utils.parse_human_datetime(origin) + granularity['origin'] = dttm.isoformat() + + if period_name in iso_8601_dict: + granularity['period'] = iso_8601_dict[period_name] + if period_name in ('week_ending_saturday', 'week_starting_sunday'): + # use Sunday as start of the week + granularity['origin'] = '2016-01-03T00:00:00' + elif not isinstance(period_name, string_types): + granularity['type'] = 'duration' + granularity['duration'] = period_name + elif period_name.startswith('P'): + # identify if the string is the iso_8601 period + granularity['period'] = period_name + else: + granularity['type'] = 'duration' + granularity['duration'] = utils.parse_human_timedelta( + period_name).total_seconds() * 1000 + return granularity + + def values_for_column(self, + column_name, + limit=10000): + """Retrieve some values for the given column""" + # TODO + + + def get_query_str(self, query_obj, phase=1, client=None): + return self.run_query(client=client, phase=phase, **query_obj) + + def run_query( # noqa / elastic + self, + groupby, metrics, + granularity, + from_dttm, to_dttm, + filter=None, # noqa + is_timeseries=True, + timeseries_limit=None, + timeseries_limit_metric=None, + row_limit=None, + inner_from_dttm=None, inner_to_dttm=None, + orderby=None, + extras=None, # noqa + select=None, # noqa + columns=None, phase=2, client=None, form_data=None): + """Runs a query against Elastic and returns a dataframe. + """ + pass + + @property + def index(self): + self.datasource_name.split('.')[0] + + def query(self, query_obj): + client = self.cluster.get_client() + equery = {} + + # Aggregations + equery['aggregations'] = {} + for m in self.metrics: + if m.metric_name in query_obj.get('metrics'): + equery['aggregations'][m.metric_name] = m.json_obj + + print(equery) + data = client.search(index=self.index, body=equery) + from pprint import pprint + print('-='*20) + pprint(data) + print('-='*20) + query_str = self.query_str() + qry_start_dttm = datetime.now() + df = None + return QueryResult( + df=df, + query=query_str, + duration=datetime.now() - qry_start_dttm) + + def get_filters(self, raw_filters): # noqa + return + + @classmethod + def query_datasources_by_name( + cls, session, database, datasource_name, schema=None): + return ( + session.query(cls) + .filter_by(cluster_name=database.id) + .filter_by(datasource_name=datasource_name) + .all() + ) + +sa.event.listen(ElasticDatasource, 'after_insert', set_perm) +sa.event.listen(ElasticDatasource, 'after_update', set_perm) diff --git a/superset/connectors/elastic/views.py b/superset/connectors/elastic/views.py new file mode 100644 index 0000000000000..604d8c1ecc448 --- /dev/null +++ b/superset/connectors/elastic/views.py @@ -0,0 +1,285 @@ +from datetime import datetime +import logging + +import sqlalchemy as sqla + +from flask import Markup, flash, redirect +from flask_appbuilder import CompactCRUDMixin, expose +from flask_appbuilder.models.sqla.interface import SQLAInterface + +from flask_babel import lazy_gettext as _ +from flask_babel import gettext as __ + +from superset import db, utils, appbuilder, sm, security +from superset.connectors.connector_registry import ConnectorRegistry +from superset.utils import has_access +from superset.views.base import BaseSupersetView +from superset.views.base import ( + SupersetModelView, validate_json, DeleteMixin, ListWidgetWithCheckboxes, + DatasourceFilter, get_datasource_exist_error_mgs) + +from . import models + +appbuilder.add_separator("Sources", ) + + +class ElasticColumnInlineView(CompactCRUDMixin, SupersetModelView): # noqa + datamodel = SQLAInterface(models.ElasticColumn) + edit_columns = [ + 'column_name', 'description', 'json', 'datasource', + 'groupby', 'filterable', 'count_distinct', 'sum', 'min', 'max'] + add_columns = edit_columns + list_columns = [ + 'column_name', 'type', 'groupby', 'filterable', 'count_distinct', + 'sum', 'min', 'max'] + can_delete = False + page_size = 500 + label_columns = { + 'column_name': _("Column"), + 'type': _("Type"), + 'datasource': _("Datasource"), + 'groupby': _("Groupable"), + 'filterable': _("Filterable"), + 'count_distinct': _("Count Distinct"), + 'sum': _("Sum"), + 'min': _("Min"), + 'max': _("Max"), + } + description_columns = { + 'filterable': _( + "Whether this column is exposed in the `Filters` section " + "of the explore view."), + 'json': utils.markdown( + "this field can be used to specify " + "a `dimensionSpec` as documented [here]" + "(http://elastic.io/docs/latest/querying/dimensionspecs.html). " + "Make sure to input valid JSON and that the " + "`outputName` matches the `column_name` defined " + "above.", + True), + } + + def post_update(self, col): + col.generate_metrics() + utils.validate_json(col.json) + + def post_add(self, col): + self.post_update(col) + +appbuilder.add_view_no_menu(ElasticColumnInlineView) + + +class ElasticMetricInlineView(CompactCRUDMixin, SupersetModelView): # noqa + datamodel = SQLAInterface(models.ElasticMetric) + list_columns = ['metric_name', 'verbose_name', 'metric_type'] + edit_columns = [ + 'metric_name', 'description', 'verbose_name', 'metric_type', 'json', + 'datasource', 'd3format', 'is_restricted'] + add_columns = edit_columns + page_size = 500 + validators_columns = { + 'json': [validate_json], + } + description_columns = { + 'metric_type': utils.markdown( + "use `postagg` as the metric type if you are defining a " + "[Elastic Post Aggregation]" + "(http://elastic.io/docs/latest/querying/post-aggregations.html)", + True), + 'is_restricted': _("Whether the access to this metric is restricted " + "to certain roles. Only roles with the permission " + "'metric access on XXX (the name of this metric)' " + "are allowed to access this metric"), + } + label_columns = { + 'metric_name': _("Metric"), + 'description': _("Description"), + 'verbose_name': _("Verbose Name"), + 'metric_type': _("Type"), + 'json': _("JSON"), + 'datasource': _("Elastic Datasource"), + } + + def post_add(self, metric): + if metric.is_restricted: + security.merge_perm(sm, 'metric_access', metric.get_perm()) + + def post_update(self, metric): + if metric.is_restricted: + security.merge_perm(sm, 'metric_access', metric.get_perm()) + +appbuilder.add_view_no_menu(ElasticMetricInlineView) + + +class ElasticClusterModelView(SupersetModelView, DeleteMixin): # noqa + datamodel = SQLAInterface(models.ElasticCluster) + add_columns = [ + 'cluster_name', 'hosts_json', 'cache_timeout', + ] + edit_columns = add_columns + list_columns = ['cluster_name', 'metadata_last_refreshed'] + search_columns = ('cluster_name',) + label_columns = { + 'cluster_name': _("Cluster"), + 'hosts_json': _("Hosts JSON configuration") + } + description_columns = { + 'hosts_json': _( + "A JSON string that represents a host, and array of host, " + "or anything else that ``elasticsearch.Elasticsearch()`` will " + "be able to interpret"), + } + + def pre_add(self, cluster): + security.merge_perm(sm, 'database_access', cluster.perm) + + def pre_update(self, cluster): + self.pre_add(cluster) + + def _delete(self, pk): + DeleteMixin._delete(self, pk) + +appbuilder.add_view( + ElasticClusterModelView, + name="Elastic Clusters", + label=__("Elastic Clusters"), + icon="fa-cubes", + category="Sources", + category_label=__("Sources"), + category_icon='fa-database',) + + +class ElasticDatasourceModelView(SupersetModelView, DeleteMixin): # noqa + datamodel = SQLAInterface(models.ElasticDatasource) + list_widget = ListWidgetWithCheckboxes + list_columns = [ + 'datasource_link', 'cluster', 'changed_by_', 'modified'] + order_columns = [ + 'datasource_link', 'changed_on_', 'offset'] + related_views = [ElasticColumnInlineView, ElasticMetricInlineView] + edit_columns = [ + 'datasource_name', 'cluster', 'slices', 'description', 'owner', + 'is_hidden', + 'filter_select_enabled', 'fetch_values_from', + 'default_endpoint', 'offset', 'cache_timeout'] + search_columns = ( + 'datasource_name', 'cluster', 'description', 'owner' + ) + add_columns = edit_columns + show_columns = add_columns + ['perm'] + page_size = 500 + base_order = ('datasource_name', 'asc') + description_columns = { + 'slices': _( + "The list of slices associated with this table. By " + "altering this datasource, you may change how these associated " + "slices behave. " + "Also note that slices need to point to a datasource, so " + "this form will fail at saving if removing slices from a " + "datasource. If you want to change the datasource for a slice, " + "overwrite the slice from the 'explore view'"), + 'offset': _("Timezone offset (in hours) for this datasource"), + 'description': Markup( + "Supports markdown"), + 'fetch_values_from': _( + "Time expression to use as a predicate when retrieving " + "distinct values to populate the filter component. " + "Only applies when `Enable Filter Select` is on. If " + "you enter `7 days ago`, the distinct list of values in " + "the filter will be populated based on the distinct value over " + "the past week"), + 'filter_select_enabled': _( + "Whether to populate the filter's dropdown in the explore " + "view's filter section with a list of distinct values fetched " + "from the backend on the fly"), + 'default_endpoint': _( + "Redirects to this endpoint when clicking on the datasource " + "from the datasource list"), + } + base_filters = [['id', DatasourceFilter, lambda: []]] + label_columns = { + 'slices': _("Associated Slices"), + 'datasource_link': _("Data Source"), + 'cluster': _("Cluster"), + 'description': _("Description"), + 'owner': _("Owner"), + 'is_hidden': _("Is Hidden"), + 'filter_select_enabled': _("Enable Filter Select"), + 'default_endpoint': _("Default Endpoint"), + 'offset': _("Time Offset"), + 'cache_timeout': _("Cache Timeout"), + } + + def pre_add(self, datasource): + number_of_existing_datasources = db.session.query( + sqla.func.count('*')).filter( + models.ElasticDatasource.datasource_name == + datasource.datasource_name, + models.ElasticDatasource.cluster_name == datasource.cluster.id + ).scalar() + + # table object is already added to the session + if number_of_existing_datasources > 1: + raise Exception(get_datasource_exist_error_mgs( + datasource.full_name)) + + def post_add(self, datasource): + datasource.generate_metrics() + security.merge_perm(sm, 'datasource_access', datasource.get_perm()) + if datasource.schema: + security.merge_perm(sm, 'schema_access', datasource.schema_perm) + + def post_update(self, datasource): + self.post_add(datasource) + + def _delete(self, pk): + DeleteMixin._delete(self, pk) + +appbuilder.add_view( + ElasticDatasourceModelView, + "Elastic Datasources", + label=__("Elastic Datasources"), + category="Sources", + category_label=__("Sources"), + icon="fa-cube") + + +class Elastic(BaseSupersetView): + """The base views for Superset!""" + + @has_access + @expose("/refresh_datasources/") + def refresh_datasources(self): + """endpoint that refreshes elastic datasources metadata""" + session = db.session() + ElasticCluster = ConnectorRegistry.sources['elastic'].cluster_class + for cluster in session.query(ElasticCluster).all(): + cluster_name = cluster.cluster_name + try: + cluster.refresh_datasources() + except Exception as e: + flash( + "Error while processing cluster '{}'\n{}".format( + cluster_name, utils.error_msg_from_exception(e)), + "danger") + logging.exception(e) + return redirect('/elasticclustermodelview/list/') + cluster.metadata_last_refreshed = datetime.now() + flash( + "Refreshed metadata from cluster " + "[" + cluster.cluster_name + "]", + 'info') + session.commit() + return redirect("/elasticdatasourcemodelview/list/") + +appbuilder.add_view_no_menu(Elastic) + +appbuilder.add_link( + "Refresh Elastic Metadata", + label=__("Refresh Elastic Metadata"), + href='/elastic/refresh_datasources/', + category='Sources', + category_label=__("Sources"), + category_icon='fa-database', + icon="fa-cog") diff --git a/superset/migrations/versions/b97e54338b27_elasticsearch.py b/superset/migrations/versions/b97e54338b27_elasticsearch.py new file mode 100644 index 0000000000000..e5e5e0456b7d3 --- /dev/null +++ b/superset/migrations/versions/b97e54338b27_elasticsearch.py @@ -0,0 +1,112 @@ +"""elasticsearch + +Revision ID: b97e54338b27 +Revises: a65458420354 +Create Date: 2017-06-11 22:03:07.505841 + +""" +from alembic import op +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision = 'b97e54338b27' +down_revision = 'a65458420354' + + +def upgrade(): + op.create_table( + 'elastic_clusters', + sa.Column('created_on', sa.DateTime(), nullable=True), + sa.Column('changed_on', sa.DateTime(), nullable=True), + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('cluster_name', sa.String(length=250), nullable=True), + sa.Column('hosts_json', sa.Text(), nullable=True), + sa.Column('metadata_last_refreshed', sa.DateTime(), nullable=True), + sa.Column('cache_timeout', sa.Integer(), nullable=True), + sa.Column('changed_by_fk', sa.Integer(), nullable=True), + sa.Column('created_by_fk', sa.Integer(), nullable=True), + sa.ForeignKeyConstraint(['changed_by_fk'], ['ab_user.id'], ), + sa.ForeignKeyConstraint(['created_by_fk'], ['ab_user.id'], ), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('cluster_name') + ) + op.create_table( + 'elastic_datasources', + sa.Column('created_on', sa.DateTime(), nullable=True), + sa.Column('changed_on', sa.DateTime(), nullable=True), + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('description', sa.Text(), nullable=True), + sa.Column('default_endpoint', sa.Text(), nullable=True), + sa.Column('is_featured', sa.Boolean(), nullable=True), + sa.Column('filter_select_enabled', sa.Boolean(), nullable=True), + sa.Column('offset', sa.Integer(), nullable=True), + sa.Column('cache_timeout', sa.Integer(), nullable=True), + sa.Column('params', sa.String(length=1000), nullable=True), + sa.Column('perm', sa.String(length=1000), nullable=True), + sa.Column('datasource_name', sa.String(length=255), nullable=True), + sa.Column('is_hidden', sa.Boolean(), nullable=True), + sa.Column('fetch_values_from', sa.String(length=100), nullable=True), + sa.Column('cluster_name', sa.String(length=250), nullable=True), + sa.Column('user_id', sa.Integer(), nullable=True), + sa.Column('changed_by_fk', sa.Integer(), nullable=True), + sa.Column('created_by_fk', sa.Integer(), nullable=True), + sa.ForeignKeyConstraint(['changed_by_fk'], ['ab_user.id'], ), + sa.ForeignKeyConstraint(['cluster_name'], ['elastic_clusters.cluster_name'], ), + sa.ForeignKeyConstraint(['created_by_fk'], ['ab_user.id'], ), + sa.ForeignKeyConstraint(['user_id'], ['ab_user.id'], ), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('datasource_name') + ) + op.create_table( + 'elastic_columns', + sa.Column('created_on', sa.DateTime(), nullable=True), + sa.Column('changed_on', sa.DateTime(), nullable=True), + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('column_name', sa.String(length=255), nullable=True), + sa.Column('verbose_name', sa.String(length=1024), nullable=True), + sa.Column('is_active', sa.Boolean(), nullable=True), + sa.Column('type', sa.String(length=32), nullable=True), + sa.Column('groupby', sa.Boolean(), nullable=True), + sa.Column('count_distinct', sa.Boolean(), nullable=True), + sa.Column('sum', sa.Boolean(), nullable=True), + sa.Column('avg', sa.Boolean(), nullable=True), + sa.Column('max', sa.Boolean(), nullable=True), + sa.Column('min', sa.Boolean(), nullable=True), + sa.Column('filterable', sa.Boolean(), nullable=True), + sa.Column('description', sa.Text(), nullable=True), + sa.Column('datasource_name', sa.String(length=255), nullable=True), + sa.Column('json', sa.Text(), nullable=True), + sa.Column('changed_by_fk', sa.Integer(), nullable=True), + sa.Column('created_by_fk', sa.Integer(), nullable=True), + sa.ForeignKeyConstraint(['changed_by_fk'], ['ab_user.id'], ), + sa.ForeignKeyConstraint(['created_by_fk'], ['ab_user.id'], ), + sa.ForeignKeyConstraint(['datasource_name'], ['elastic_datasources.datasource_name'], ), + sa.PrimaryKeyConstraint('id') + ) + op.create_table( + 'elastic_metrics', + sa.Column('created_on', sa.DateTime(), nullable=True), + sa.Column('changed_on', sa.DateTime(), nullable=True), + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('metric_name', sa.String(length=512), nullable=True), + sa.Column('verbose_name', sa.String(length=1024), nullable=True), + sa.Column('metric_type', sa.String(length=32), nullable=True), + sa.Column('description', sa.Text(), nullable=True), + sa.Column('is_restricted', sa.Boolean(), nullable=True), + sa.Column('d3format', sa.String(length=128), nullable=True), + sa.Column('datasource_name', sa.String(length=255), nullable=True), + sa.Column('json', sa.Text(), nullable=True), + sa.Column('changed_by_fk', sa.Integer(), nullable=True), + sa.Column('created_by_fk', sa.Integer(), nullable=True), + sa.ForeignKeyConstraint(['changed_by_fk'], ['ab_user.id'], ), + sa.ForeignKeyConstraint(['created_by_fk'], ['ab_user.id'], ), + sa.ForeignKeyConstraint(['datasource_name'], ['elastic_datasources.datasource_name'], ), + sa.PrimaryKeyConstraint('id') + ) + + +def downgrade(): + op.drop_table('elastic_metrics') + op.drop_table('elastic_columns') + op.drop_table('elastic_datasources') + op.drop_table('elastic_clusters') diff --git a/superset/views/core.py b/superset/views/core.py index 06a0c1397a7f8..50ef4276e1116 100755 --- a/superset/views/core.py +++ b/superset/views/core.py @@ -2281,7 +2281,6 @@ class CssTemplateModelView(SupersetModelView, DeleteMixin): class CssTemplateAsyncModelView(CssTemplateModelView): list_columns = ['template_name', 'css'] -appbuilder.add_separator("Sources") appbuilder.add_view( CssTemplateModelView, "CSS Templates",