diff --git a/superset/config.py b/superset/config.py
index 6c38fa2f76b51..4c207f745789c 100644
--- a/superset/config.py
+++ b/superset/config.py
@@ -190,6 +190,7 @@
DEFAULT_MODULE_DS_MAP = OrderedDict([
('superset.connectors.sqla.models', ['SqlaTable']),
('superset.connectors.druid.models', ['DruidDatasource']),
+ ('superset.connectors.elastic.models', ['ElasticDatasource']),
])
ADDITIONAL_MODULE_DS_MAP = {}
ADDITIONAL_MIDDLEWARE = []
diff --git a/superset/connectors/druid/views.py b/superset/connectors/druid/views.py
index 7ae3fb37096aa..34cdc357ed89d 100644
--- a/superset/connectors/druid/views.py
+++ b/superset/connectors/druid/views.py
@@ -21,6 +21,8 @@
from . import models
+appbuilder.add_separator("Sources", )
+
class DruidColumnInlineView(CompactCRUDMixin, SupersetModelView): # noqa
datamodel = SQLAInterface(models.DruidColumn)
@@ -283,6 +285,3 @@ def refresh_datasources(self):
category_label=__("Sources"),
category_icon='fa-database',
icon="fa-cog")
-
-
-appbuilder.add_separator("Sources", )
diff --git a/superset/connectors/elastic/__init__.py b/superset/connectors/elastic/__init__.py
new file mode 100644
index 0000000000000..b2df79851f224
--- /dev/null
+++ b/superset/connectors/elastic/__init__.py
@@ -0,0 +1,2 @@
+from . import models # noqa
+from . import views # noqa
diff --git a/superset/connectors/elastic/models.py b/superset/connectors/elastic/models.py
new file mode 100644
index 0000000000000..609e14021129e
--- /dev/null
+++ b/superset/connectors/elastic/models.py
@@ -0,0 +1,583 @@
+# pylint: disable=invalid-unary-operand-type
+import json
+import logging
+from datetime import datetime, timedelta
+from six import string_types
+
+import requests
+import sqlalchemy as sa
+from sqlalchemy import (
+ Column, Integer, String, ForeignKey, Text, Boolean,
+ DateTime,
+)
+from sqlalchemy.orm import backref, relationship
+from dateutil.parser import parse as dparse
+
+from flask import Markup, escape
+from flask_appbuilder.models.decorators import renders
+from flask_appbuilder import Model
+
+from flask_babel import lazy_gettext as _
+
+from elasticsearch import Elasticsearch
+
+from superset import conf, db, import_util, utils, sm, get_session
+from superset.utils import flasher
+from superset.connectors.base import BaseDatasource, BaseColumn, BaseMetric
+from superset.models.helpers import AuditMixinNullable, QueryResult, set_perm
+
+
+class ElasticCluster(Model, AuditMixinNullable):
+
+ """ORM object referencing the Elastic clusters"""
+
+ __tablename__ = 'elastic_clusters'
+ type = "elastic"
+
+ id = Column(Integer, primary_key=True)
+ cluster_name = Column(String(250), unique=True)
+ hosts_json = Column(Text)
+ metadata_last_refreshed = Column(DateTime)
+ cache_timeout = Column(Integer)
+
+ def __repr__(self):
+ return self.cluster_name
+
+ @property
+ def hosts(self):
+ return json.loads(self.hosts_json)
+
+ def get_client(self):
+ return Elasticsearch(self.hosts)
+
+ def get_mappings(self):
+ client = self.get_client()
+ return client.indices.get_mapping()
+
+ def refresh_datasources(self, datasource_name=None, merge_flag=False):
+ """Refresh metadata of all datasources in the cluster
+ If ``datasource_name`` is specified, only that datasource is updated
+ """
+ for index_name, index_metadata in self.get_mappings().items():
+ for name, mapping_metadata in index_metadata.get('mappings').items():
+ ElasticDatasource.sync_to_db(
+ '{}.{}'.format(index_name, name), mapping_metadata, self)
+
+ @property
+ def perm(self):
+ return "[{obj.cluster_name}].(id:{obj.id})".format(obj=self)
+
+ def get_perm(self):
+ return self.perm
+
+ @property
+ def name(self):
+ return self.verbose_name if self.verbose_name else self.cluster_name
+
+ @property
+ def unique_name(self):
+ return self.verbose_name if self.verbose_name else self.cluster_name
+
+
+class ElasticColumn(Model, BaseColumn):
+ """ORM model for storing Elastic datasource column metadata"""
+
+ __tablename__ = 'elastic_columns'
+
+ datasource_name = Column(
+ String(255),
+ ForeignKey('elastic_datasources.datasource_name'))
+ # Setting enable_typechecks=False disables polymorphic inheritance.
+ datasource = relationship(
+ 'ElasticDatasource',
+ backref=backref('columns', cascade='all, delete-orphan'),
+ enable_typechecks=False)
+ json = Column(Text)
+
+ export_fields = (
+ 'datasource_name', 'column_name', 'is_active', 'type', 'groupby',
+ 'count_distinct', 'sum', 'avg', 'max', 'min', 'filterable',
+ 'description', 'dimension_spec_json'
+ )
+
+ @property
+ def expression(self):
+ return self.json
+
+ def __repr__(self):
+ return self.column_name
+
+ @property
+ def dimension_spec(self):
+ if self.dimension_spec_json:
+ return json.loads(self.dimension_spec_json)
+
+ def generate_metrics(self):
+ """Generate metrics based on the column metadata"""
+ M = ElasticMetric # noqa
+ metrics = []
+ metrics.append(ElasticMetric(
+ metric_name='count',
+ verbose_name='COUNT(*)',
+ metric_type='count',
+ json=json.dumps({'type': 'count', 'name': 'count'})
+ ))
+ if self.sum and self.is_num:
+ mt = self.type.lower() + 'Sum'
+ name = 'sum__' + self.column_name
+ metrics.append(ElasticMetric(
+ metric_name=name,
+ metric_type='sum',
+ verbose_name='SUM({})'.format(self.column_name),
+ json=json.dumps({'sum': {'field': self.column_name}})
+ ))
+
+ if self.avg and self.is_num:
+ mt = self.type.lower() + 'Avg'
+ name = 'avg__' + self.column_name
+ metrics.append(ElasticMetric(
+ metric_name=name,
+ metric_type='avg',
+ verbose_name='AVG({})'.format(self.column_name),
+ json=json.dumps({'avg': {'field': self.column_name}})
+ ))
+
+ if self.min and self.is_num:
+ mt = self.type.lower() + 'Min'
+ name = 'min__' + self.column_name
+ metrics.append(ElasticMetric(
+ metric_name=name,
+ metric_type='min',
+ verbose_name='MIN({})'.format(self.column_name),
+ json=json.dumps({'min': {'field': self.column_name}})
+ ))
+ if self.max and self.is_num:
+ mt = self.type.lower() + 'Max'
+ name = 'max__' + self.column_name
+ metrics.append(ElasticMetric(
+ metric_name=name,
+ metric_type='max',
+ verbose_name='MAX({})'.format(self.column_name),
+ json=json.dumps({'max': {'field': self.column_name}})
+ ))
+ if self.count_distinct:
+ mt = 'count_distinct'
+ metrics.append(ElasticMetric(
+ metric_name=name,
+ verbose_name='COUNT(DISTINCT {})'.format(self.column_name),
+ metric_type='count_distinct',
+ json=json.dumps({'cardinality': {'field': self.column_name}})
+ ))
+ session = get_session()
+ new_metrics = []
+ for metric in metrics:
+ m = (
+ session.query(M)
+ .filter(M.metric_name == metric.metric_name)
+ .filter(M.datasource_name == self.datasource_name)
+ .filter(ElasticCluster.cluster_name == self.datasource.cluster_name)
+ .first()
+ )
+ metric.datasource_name = self.datasource_name
+ if not m:
+ new_metrics.append(metric)
+ session.add(metric)
+ session.flush()
+
+ @classmethod
+ def import_obj(cls, i_column):
+ def lookup_obj(lookup_column):
+ return db.session.query(ElasticColumn).filter(
+ ElasticColumn.datasource_name == lookup_column.datasource_name,
+ ElasticColumn.column_name == lookup_column.column_name).first()
+
+ return import_util.import_simple_obj(db.session, i_column, lookup_obj)
+
+
+class ElasticMetric(Model, BaseMetric):
+
+ """ORM object referencing Elastic metrics for a datasource"""
+
+ __tablename__ = 'elastic_metrics'
+ datasource_name = Column(
+ String(255),
+ ForeignKey('elastic_datasources.datasource_name'))
+ # Setting enable_typechecks=False disables polymorphic inheritance.
+ datasource = relationship(
+ 'ElasticDatasource',
+ backref=backref('metrics', cascade='all, delete-orphan'),
+ enable_typechecks=False)
+ json = Column(Text)
+
+ export_fields = (
+ 'metric_name', 'verbose_name', 'metric_type', 'datasource_name',
+ 'json', 'description', 'is_restricted', 'd3format'
+ )
+
+ @property
+ def expression(self):
+ return self.json
+
+ @property
+ def json_obj(self):
+ try:
+ obj = json.loads(self.json)
+ except Exception:
+ obj = {}
+ return obj
+
+ @property
+ def perm(self):
+ return (
+ "{parent_name}.[{obj.metric_name}](id:{obj.id})"
+ ).format(obj=self,
+ parent_name=self.datasource.full_name
+ ) if self.datasource else None
+
+ @classmethod
+ def import_obj(cls, i_metric):
+ def lookup_obj(lookup_metric):
+ return db.session.query(ElasticMetric).filter(
+ ElasticMetric.datasource_name == lookup_metric.datasource_name,
+ ElasticMetric.metric_name == lookup_metric.metric_name).first()
+ return import_util.import_simple_obj(db.session, i_metric, lookup_obj)
+
+
+class ElasticDatasource(Model, BaseDatasource):
+
+ """ORM object referencing Elastic datasources (tables)"""
+
+ __tablename__ = 'elastic_datasources'
+
+ type = "elastic"
+ query_langtage = "json"
+ cluster_class = ElasticCluster
+ metric_class = ElasticMetric
+ column_class = ElasticColumn
+
+ baselink = "elasticdatasourcemodelview"
+
+ # Columns
+ datasource_name = Column(String(255), unique=True)
+ is_hidden = Column(Boolean, default=False)
+ fetch_values_from = Column(String(100))
+ cluster_name = Column(
+ String(250), ForeignKey('elastic_clusters.cluster_name'))
+ cluster = relationship(
+ 'ElasticCluster', backref='datasources', foreign_keys=[cluster_name])
+ user_id = Column(Integer, ForeignKey('ab_user.id'))
+ owner = relationship(
+ sm.user_model,
+ backref=backref('elastic_datasources', cascade='all, delete-orphan'),
+ foreign_keys=[user_id])
+
+ export_fields = (
+ 'datasource_name', 'is_hidden', 'description', 'default_endpoint',
+ 'cluster_name', 'offset', 'cache_timeout', 'params'
+ )
+ slices = relationship(
+ 'Slice',
+ primaryjoin=(
+ "ElasticDatasource.id == foreign(Slice.datasource_id) and "
+ "Slice.datasource_type == 'elastic'"))
+
+ @property
+ def database(self):
+ return self.cluster
+
+ @property
+ def num_cols(self):
+ return [c.column_name for c in self.columns if c.is_num]
+
+ @property
+ def name(self):
+ return self.datasource_name
+
+ @property
+ def schema(self):
+ ds_name = self.datasource_name or ''
+ name_pieces = ds_name.split('.')
+ if len(name_pieces) > 1:
+ return name_pieces[0]
+ else:
+ return None
+
+ @property
+ def schema_perm(self):
+ """Returns schema permission if present, cluster one otherwise."""
+ return utils.get_schema_perm(self.cluster, self.schema)
+
+ def get_perm(self):
+ return (
+ "[{obj.cluster_name}].[{obj.datasource_name}]"
+ "(id:{obj.id})").format(obj=self)
+
+ @property
+ def link(self):
+ name = escape(self.datasource_name)
+ return Markup('{name}').format(**locals())
+
+ @property
+ def full_name(self):
+ return utils.get_datasource_full_name(
+ self.cluster_name, self.datasource_name)
+
+ @property
+ def time_column_grains(self):
+ return {
+ "time_columns": [
+ 'all', '5 seconds', '30 seconds', '1 minute',
+ '5 minutes', '1 hour', '6 hour', '1 day', '7 days',
+ 'week', 'week_starting_sunday', 'week_ending_saturday',
+ 'month',
+ ],
+ "time_grains": ['now']
+ }
+
+ def __repr__(self):
+ return self.datasource_name
+
+ @renders('datasource_name')
+ def datasource_link(self):
+ url = "/superset/explore/{obj.type}/{obj.id}/".format(obj=self)
+ name = escape(self.datasource_name)
+ return Markup('{name}'.format(**locals()))
+
+ def get_metric_obj(self, metric_name):
+ return [
+ m.json_obj for m in self.metrics
+ if m.metric_name == metric_name
+ ][0]
+
+ @classmethod
+ def import_obj(cls, i_datasource, import_time=None):
+ """Imports the datasource from the object to the database.
+
+ Metrics and columns and datasource will be overridden if exists.
+ This function can be used to import/export dashboards between multiple
+ superset instances. Audit metadata isn't copies over.
+ """
+ def lookup_datasource(d):
+ return db.session.query(ElasticDatasource).join(ElasticCluster).filter(
+ ElasticDatasource.datasource_name == d.datasource_name,
+ ElasticCluster.cluster_name == d.cluster_name,
+ ).first()
+
+ def lookup_cluster(d):
+ return db.session.query(ElasticCluster).filter_by(
+ cluster_name=d.cluster_name).one()
+ return import_util.import_datasource(
+ db.session, i_datasource, lookup_cluster, lookup_datasource,
+ import_time)
+
+ @staticmethod
+ def version_higher(v1, v2):
+ """is v1 higher than v2
+
+ >>> ElasticDatasource.version_higher('0.8.2', '0.9.1')
+ False
+ >>> ElasticDatasource.version_higher('0.8.2', '0.6.1')
+ True
+ >>> ElasticDatasource.version_higher('0.8.2', '0.8.2')
+ False
+ >>> ElasticDatasource.version_higher('0.8.2', '0.9.BETA')
+ False
+ >>> ElasticDatasource.version_higher('0.8.2', '0.9')
+ False
+ """
+ def int_or_0(v):
+ try:
+ v = int(v)
+ except (TypeError, ValueError):
+ v = 0
+ return v
+ v1nums = [int_or_0(n) for n in v1.split('.')]
+ v2nums = [int_or_0(n) for n in v2.split('.')]
+ v1nums = (v1nums + [0, 0, 0])[:3]
+ v2nums = (v2nums + [0, 0, 0])[:3]
+ return v1nums[0] > v2nums[0] or \
+ (v1nums[0] == v2nums[0] and v1nums[1] > v2nums[1]) or \
+ (v1nums[0] == v2nums[0] and v1nums[1] == v2nums[1] and v1nums[2] > v2nums[2])
+
+ def generate_metrics(self):
+ for col in self.columns:
+ col.generate_metrics()
+
+ def query_str(self):
+ d = {"query": None}
+ return json.dumps(d)
+
+ @classmethod
+ def sync_to_db(cls, name, metadata, cluster):
+ """Fetches metadata for that datasource and merges the Superset db"""
+ logging.info("Syncing Elastic datasource [{}]".format(name))
+ session = get_session()
+ datasource = session.query(cls).filter_by(datasource_name=name).first()
+ if not datasource:
+ datasource = cls(datasource_name=name)
+ session.add(datasource)
+ flasher("Adding new datasource [{}]".format(name), "success")
+ else:
+ flasher("Refreshing datasource [{}]".format(name), "info")
+ session.flush()
+ datasource.cluster = cluster
+ session.flush()
+
+ for col_name, col_metadata in metadata.get('properties').items():
+ cls.merge_column(col_name, col_metadata, datasource, session)
+
+ @classmethod
+ def merge_column(cls, col_name, col_metadata, datasource, sesh):
+ col_obj = (
+ sesh
+ .query(ElasticColumn)
+ .filter_by(
+ datasource_name=datasource.datasource_name,
+ column_name=col_name)
+ .first()
+ )
+ datatype = col_metadata.get('type')
+ if not col_obj:
+ col_obj = ElasticColumn(
+ datasource_name=datasource.datasource_name,
+ column_name=col_name)
+ sesh.add(col_obj)
+ if datatype == "string":
+ col_obj.groupby = True
+ col_obj.filterable = True
+ if col_obj.is_num:
+ col_obj.sum = True
+ if col_obj:
+ col_obj.type = datatype
+ sesh.flush()
+ col_obj.datasource = datasource
+ col_obj.generate_metrics()
+ sesh.flush()
+
+
+ @staticmethod
+ def time_offset(granularity):
+ if granularity == 'week_ending_saturday':
+ return 6 * 24 * 3600 * 1000 # 6 days
+ return 0
+
+ # uses https://en.wikipedia.org/wiki/ISO_8601
+ # http://elastic.io/docs/0.8.0/querying/granularities.html
+ # TODO: pass origin from the UI
+ @staticmethod
+ def granularity(period_name, timezone=None, origin=None):
+ if not period_name or period_name == 'all':
+ return 'all'
+ iso_8601_dict = {
+ '5 seconds': 'PT5S',
+ '30 seconds': 'PT30S',
+ '1 minute': 'PT1M',
+ '5 minutes': 'PT5M',
+ '1 hour': 'PT1H',
+ '6 hour': 'PT6H',
+ 'one day': 'P1D',
+ '1 day': 'P1D',
+ '7 days': 'P7D',
+ 'week': 'P1W',
+ 'week_starting_sunday': 'P1W',
+ 'week_ending_saturday': 'P1W',
+ 'month': 'P1M',
+ }
+
+ granularity = {'type': 'period'}
+ if timezone:
+ granularity['timeZone'] = timezone
+
+ if origin:
+ dttm = utils.parse_human_datetime(origin)
+ granularity['origin'] = dttm.isoformat()
+
+ if period_name in iso_8601_dict:
+ granularity['period'] = iso_8601_dict[period_name]
+ if period_name in ('week_ending_saturday', 'week_starting_sunday'):
+ # use Sunday as start of the week
+ granularity['origin'] = '2016-01-03T00:00:00'
+ elif not isinstance(period_name, string_types):
+ granularity['type'] = 'duration'
+ granularity['duration'] = period_name
+ elif period_name.startswith('P'):
+ # identify if the string is the iso_8601 period
+ granularity['period'] = period_name
+ else:
+ granularity['type'] = 'duration'
+ granularity['duration'] = utils.parse_human_timedelta(
+ period_name).total_seconds() * 1000
+ return granularity
+
+ def values_for_column(self,
+ column_name,
+ limit=10000):
+ """Retrieve some values for the given column"""
+ # TODO
+
+
+ def get_query_str(self, query_obj, phase=1, client=None):
+ return self.run_query(client=client, phase=phase, **query_obj)
+
+ def run_query( # noqa / elastic
+ self,
+ groupby, metrics,
+ granularity,
+ from_dttm, to_dttm,
+ filter=None, # noqa
+ is_timeseries=True,
+ timeseries_limit=None,
+ timeseries_limit_metric=None,
+ row_limit=None,
+ inner_from_dttm=None, inner_to_dttm=None,
+ orderby=None,
+ extras=None, # noqa
+ select=None, # noqa
+ columns=None, phase=2, client=None, form_data=None):
+ """Runs a query against Elastic and returns a dataframe.
+ """
+ pass
+
+ @property
+ def index(self):
+ self.datasource_name.split('.')[0]
+
+ def query(self, query_obj):
+ client = self.cluster.get_client()
+ equery = {}
+
+ # Aggregations
+ equery['aggregations'] = {}
+ for m in self.metrics:
+ if m.metric_name in query_obj.get('metrics'):
+ equery['aggregations'][m.metric_name] = m.json_obj
+
+ print(equery)
+ data = client.search(index=self.index, body=equery)
+ from pprint import pprint
+ print('-='*20)
+ pprint(data)
+ print('-='*20)
+ query_str = self.query_str()
+ qry_start_dttm = datetime.now()
+ df = None
+ return QueryResult(
+ df=df,
+ query=query_str,
+ duration=datetime.now() - qry_start_dttm)
+
+ def get_filters(self, raw_filters): # noqa
+ return
+
+ @classmethod
+ def query_datasources_by_name(
+ cls, session, database, datasource_name, schema=None):
+ return (
+ session.query(cls)
+ .filter_by(cluster_name=database.id)
+ .filter_by(datasource_name=datasource_name)
+ .all()
+ )
+
+sa.event.listen(ElasticDatasource, 'after_insert', set_perm)
+sa.event.listen(ElasticDatasource, 'after_update', set_perm)
diff --git a/superset/connectors/elastic/views.py b/superset/connectors/elastic/views.py
new file mode 100644
index 0000000000000..604d8c1ecc448
--- /dev/null
+++ b/superset/connectors/elastic/views.py
@@ -0,0 +1,285 @@
+from datetime import datetime
+import logging
+
+import sqlalchemy as sqla
+
+from flask import Markup, flash, redirect
+from flask_appbuilder import CompactCRUDMixin, expose
+from flask_appbuilder.models.sqla.interface import SQLAInterface
+
+from flask_babel import lazy_gettext as _
+from flask_babel import gettext as __
+
+from superset import db, utils, appbuilder, sm, security
+from superset.connectors.connector_registry import ConnectorRegistry
+from superset.utils import has_access
+from superset.views.base import BaseSupersetView
+from superset.views.base import (
+ SupersetModelView, validate_json, DeleteMixin, ListWidgetWithCheckboxes,
+ DatasourceFilter, get_datasource_exist_error_mgs)
+
+from . import models
+
+appbuilder.add_separator("Sources", )
+
+
+class ElasticColumnInlineView(CompactCRUDMixin, SupersetModelView): # noqa
+ datamodel = SQLAInterface(models.ElasticColumn)
+ edit_columns = [
+ 'column_name', 'description', 'json', 'datasource',
+ 'groupby', 'filterable', 'count_distinct', 'sum', 'min', 'max']
+ add_columns = edit_columns
+ list_columns = [
+ 'column_name', 'type', 'groupby', 'filterable', 'count_distinct',
+ 'sum', 'min', 'max']
+ can_delete = False
+ page_size = 500
+ label_columns = {
+ 'column_name': _("Column"),
+ 'type': _("Type"),
+ 'datasource': _("Datasource"),
+ 'groupby': _("Groupable"),
+ 'filterable': _("Filterable"),
+ 'count_distinct': _("Count Distinct"),
+ 'sum': _("Sum"),
+ 'min': _("Min"),
+ 'max': _("Max"),
+ }
+ description_columns = {
+ 'filterable': _(
+ "Whether this column is exposed in the `Filters` section "
+ "of the explore view."),
+ 'json': utils.markdown(
+ "this field can be used to specify "
+ "a `dimensionSpec` as documented [here]"
+ "(http://elastic.io/docs/latest/querying/dimensionspecs.html). "
+ "Make sure to input valid JSON and that the "
+ "`outputName` matches the `column_name` defined "
+ "above.",
+ True),
+ }
+
+ def post_update(self, col):
+ col.generate_metrics()
+ utils.validate_json(col.json)
+
+ def post_add(self, col):
+ self.post_update(col)
+
+appbuilder.add_view_no_menu(ElasticColumnInlineView)
+
+
+class ElasticMetricInlineView(CompactCRUDMixin, SupersetModelView): # noqa
+ datamodel = SQLAInterface(models.ElasticMetric)
+ list_columns = ['metric_name', 'verbose_name', 'metric_type']
+ edit_columns = [
+ 'metric_name', 'description', 'verbose_name', 'metric_type', 'json',
+ 'datasource', 'd3format', 'is_restricted']
+ add_columns = edit_columns
+ page_size = 500
+ validators_columns = {
+ 'json': [validate_json],
+ }
+ description_columns = {
+ 'metric_type': utils.markdown(
+ "use `postagg` as the metric type if you are defining a "
+ "[Elastic Post Aggregation]"
+ "(http://elastic.io/docs/latest/querying/post-aggregations.html)",
+ True),
+ 'is_restricted': _("Whether the access to this metric is restricted "
+ "to certain roles. Only roles with the permission "
+ "'metric access on XXX (the name of this metric)' "
+ "are allowed to access this metric"),
+ }
+ label_columns = {
+ 'metric_name': _("Metric"),
+ 'description': _("Description"),
+ 'verbose_name': _("Verbose Name"),
+ 'metric_type': _("Type"),
+ 'json': _("JSON"),
+ 'datasource': _("Elastic Datasource"),
+ }
+
+ def post_add(self, metric):
+ if metric.is_restricted:
+ security.merge_perm(sm, 'metric_access', metric.get_perm())
+
+ def post_update(self, metric):
+ if metric.is_restricted:
+ security.merge_perm(sm, 'metric_access', metric.get_perm())
+
+appbuilder.add_view_no_menu(ElasticMetricInlineView)
+
+
+class ElasticClusterModelView(SupersetModelView, DeleteMixin): # noqa
+ datamodel = SQLAInterface(models.ElasticCluster)
+ add_columns = [
+ 'cluster_name', 'hosts_json', 'cache_timeout',
+ ]
+ edit_columns = add_columns
+ list_columns = ['cluster_name', 'metadata_last_refreshed']
+ search_columns = ('cluster_name',)
+ label_columns = {
+ 'cluster_name': _("Cluster"),
+ 'hosts_json': _("Hosts JSON configuration")
+ }
+ description_columns = {
+ 'hosts_json': _(
+ "A JSON string that represents a host, and array of host, "
+ "or anything else that ``elasticsearch.Elasticsearch()`` will "
+ "be able to interpret"),
+ }
+
+ def pre_add(self, cluster):
+ security.merge_perm(sm, 'database_access', cluster.perm)
+
+ def pre_update(self, cluster):
+ self.pre_add(cluster)
+
+ def _delete(self, pk):
+ DeleteMixin._delete(self, pk)
+
+appbuilder.add_view(
+ ElasticClusterModelView,
+ name="Elastic Clusters",
+ label=__("Elastic Clusters"),
+ icon="fa-cubes",
+ category="Sources",
+ category_label=__("Sources"),
+ category_icon='fa-database',)
+
+
+class ElasticDatasourceModelView(SupersetModelView, DeleteMixin): # noqa
+ datamodel = SQLAInterface(models.ElasticDatasource)
+ list_widget = ListWidgetWithCheckboxes
+ list_columns = [
+ 'datasource_link', 'cluster', 'changed_by_', 'modified']
+ order_columns = [
+ 'datasource_link', 'changed_on_', 'offset']
+ related_views = [ElasticColumnInlineView, ElasticMetricInlineView]
+ edit_columns = [
+ 'datasource_name', 'cluster', 'slices', 'description', 'owner',
+ 'is_hidden',
+ 'filter_select_enabled', 'fetch_values_from',
+ 'default_endpoint', 'offset', 'cache_timeout']
+ search_columns = (
+ 'datasource_name', 'cluster', 'description', 'owner'
+ )
+ add_columns = edit_columns
+ show_columns = add_columns + ['perm']
+ page_size = 500
+ base_order = ('datasource_name', 'asc')
+ description_columns = {
+ 'slices': _(
+ "The list of slices associated with this table. By "
+ "altering this datasource, you may change how these associated "
+ "slices behave. "
+ "Also note that slices need to point to a datasource, so "
+ "this form will fail at saving if removing slices from a "
+ "datasource. If you want to change the datasource for a slice, "
+ "overwrite the slice from the 'explore view'"),
+ 'offset': _("Timezone offset (in hours) for this datasource"),
+ 'description': Markup(
+ "Supports markdown"),
+ 'fetch_values_from': _(
+ "Time expression to use as a predicate when retrieving "
+ "distinct values to populate the filter component. "
+ "Only applies when `Enable Filter Select` is on. If "
+ "you enter `7 days ago`, the distinct list of values in "
+ "the filter will be populated based on the distinct value over "
+ "the past week"),
+ 'filter_select_enabled': _(
+ "Whether to populate the filter's dropdown in the explore "
+ "view's filter section with a list of distinct values fetched "
+ "from the backend on the fly"),
+ 'default_endpoint': _(
+ "Redirects to this endpoint when clicking on the datasource "
+ "from the datasource list"),
+ }
+ base_filters = [['id', DatasourceFilter, lambda: []]]
+ label_columns = {
+ 'slices': _("Associated Slices"),
+ 'datasource_link': _("Data Source"),
+ 'cluster': _("Cluster"),
+ 'description': _("Description"),
+ 'owner': _("Owner"),
+ 'is_hidden': _("Is Hidden"),
+ 'filter_select_enabled': _("Enable Filter Select"),
+ 'default_endpoint': _("Default Endpoint"),
+ 'offset': _("Time Offset"),
+ 'cache_timeout': _("Cache Timeout"),
+ }
+
+ def pre_add(self, datasource):
+ number_of_existing_datasources = db.session.query(
+ sqla.func.count('*')).filter(
+ models.ElasticDatasource.datasource_name ==
+ datasource.datasource_name,
+ models.ElasticDatasource.cluster_name == datasource.cluster.id
+ ).scalar()
+
+ # table object is already added to the session
+ if number_of_existing_datasources > 1:
+ raise Exception(get_datasource_exist_error_mgs(
+ datasource.full_name))
+
+ def post_add(self, datasource):
+ datasource.generate_metrics()
+ security.merge_perm(sm, 'datasource_access', datasource.get_perm())
+ if datasource.schema:
+ security.merge_perm(sm, 'schema_access', datasource.schema_perm)
+
+ def post_update(self, datasource):
+ self.post_add(datasource)
+
+ def _delete(self, pk):
+ DeleteMixin._delete(self, pk)
+
+appbuilder.add_view(
+ ElasticDatasourceModelView,
+ "Elastic Datasources",
+ label=__("Elastic Datasources"),
+ category="Sources",
+ category_label=__("Sources"),
+ icon="fa-cube")
+
+
+class Elastic(BaseSupersetView):
+ """The base views for Superset!"""
+
+ @has_access
+ @expose("/refresh_datasources/")
+ def refresh_datasources(self):
+ """endpoint that refreshes elastic datasources metadata"""
+ session = db.session()
+ ElasticCluster = ConnectorRegistry.sources['elastic'].cluster_class
+ for cluster in session.query(ElasticCluster).all():
+ cluster_name = cluster.cluster_name
+ try:
+ cluster.refresh_datasources()
+ except Exception as e:
+ flash(
+ "Error while processing cluster '{}'\n{}".format(
+ cluster_name, utils.error_msg_from_exception(e)),
+ "danger")
+ logging.exception(e)
+ return redirect('/elasticclustermodelview/list/')
+ cluster.metadata_last_refreshed = datetime.now()
+ flash(
+ "Refreshed metadata from cluster "
+ "[" + cluster.cluster_name + "]",
+ 'info')
+ session.commit()
+ return redirect("/elasticdatasourcemodelview/list/")
+
+appbuilder.add_view_no_menu(Elastic)
+
+appbuilder.add_link(
+ "Refresh Elastic Metadata",
+ label=__("Refresh Elastic Metadata"),
+ href='/elastic/refresh_datasources/',
+ category='Sources',
+ category_label=__("Sources"),
+ category_icon='fa-database',
+ icon="fa-cog")
diff --git a/superset/migrations/versions/b97e54338b27_elasticsearch.py b/superset/migrations/versions/b97e54338b27_elasticsearch.py
new file mode 100644
index 0000000000000..e5e5e0456b7d3
--- /dev/null
+++ b/superset/migrations/versions/b97e54338b27_elasticsearch.py
@@ -0,0 +1,112 @@
+"""elasticsearch
+
+Revision ID: b97e54338b27
+Revises: a65458420354
+Create Date: 2017-06-11 22:03:07.505841
+
+"""
+from alembic import op
+import sqlalchemy as sa
+
+# revision identifiers, used by Alembic.
+revision = 'b97e54338b27'
+down_revision = 'a65458420354'
+
+
+def upgrade():
+ op.create_table(
+ 'elastic_clusters',
+ sa.Column('created_on', sa.DateTime(), nullable=True),
+ sa.Column('changed_on', sa.DateTime(), nullable=True),
+ sa.Column('id', sa.Integer(), nullable=False),
+ sa.Column('cluster_name', sa.String(length=250), nullable=True),
+ sa.Column('hosts_json', sa.Text(), nullable=True),
+ sa.Column('metadata_last_refreshed', sa.DateTime(), nullable=True),
+ sa.Column('cache_timeout', sa.Integer(), nullable=True),
+ sa.Column('changed_by_fk', sa.Integer(), nullable=True),
+ sa.Column('created_by_fk', sa.Integer(), nullable=True),
+ sa.ForeignKeyConstraint(['changed_by_fk'], ['ab_user.id'], ),
+ sa.ForeignKeyConstraint(['created_by_fk'], ['ab_user.id'], ),
+ sa.PrimaryKeyConstraint('id'),
+ sa.UniqueConstraint('cluster_name')
+ )
+ op.create_table(
+ 'elastic_datasources',
+ sa.Column('created_on', sa.DateTime(), nullable=True),
+ sa.Column('changed_on', sa.DateTime(), nullable=True),
+ sa.Column('id', sa.Integer(), nullable=False),
+ sa.Column('description', sa.Text(), nullable=True),
+ sa.Column('default_endpoint', sa.Text(), nullable=True),
+ sa.Column('is_featured', sa.Boolean(), nullable=True),
+ sa.Column('filter_select_enabled', sa.Boolean(), nullable=True),
+ sa.Column('offset', sa.Integer(), nullable=True),
+ sa.Column('cache_timeout', sa.Integer(), nullable=True),
+ sa.Column('params', sa.String(length=1000), nullable=True),
+ sa.Column('perm', sa.String(length=1000), nullable=True),
+ sa.Column('datasource_name', sa.String(length=255), nullable=True),
+ sa.Column('is_hidden', sa.Boolean(), nullable=True),
+ sa.Column('fetch_values_from', sa.String(length=100), nullable=True),
+ sa.Column('cluster_name', sa.String(length=250), nullable=True),
+ sa.Column('user_id', sa.Integer(), nullable=True),
+ sa.Column('changed_by_fk', sa.Integer(), nullable=True),
+ sa.Column('created_by_fk', sa.Integer(), nullable=True),
+ sa.ForeignKeyConstraint(['changed_by_fk'], ['ab_user.id'], ),
+ sa.ForeignKeyConstraint(['cluster_name'], ['elastic_clusters.cluster_name'], ),
+ sa.ForeignKeyConstraint(['created_by_fk'], ['ab_user.id'], ),
+ sa.ForeignKeyConstraint(['user_id'], ['ab_user.id'], ),
+ sa.PrimaryKeyConstraint('id'),
+ sa.UniqueConstraint('datasource_name')
+ )
+ op.create_table(
+ 'elastic_columns',
+ sa.Column('created_on', sa.DateTime(), nullable=True),
+ sa.Column('changed_on', sa.DateTime(), nullable=True),
+ sa.Column('id', sa.Integer(), nullable=False),
+ sa.Column('column_name', sa.String(length=255), nullable=True),
+ sa.Column('verbose_name', sa.String(length=1024), nullable=True),
+ sa.Column('is_active', sa.Boolean(), nullable=True),
+ sa.Column('type', sa.String(length=32), nullable=True),
+ sa.Column('groupby', sa.Boolean(), nullable=True),
+ sa.Column('count_distinct', sa.Boolean(), nullable=True),
+ sa.Column('sum', sa.Boolean(), nullable=True),
+ sa.Column('avg', sa.Boolean(), nullable=True),
+ sa.Column('max', sa.Boolean(), nullable=True),
+ sa.Column('min', sa.Boolean(), nullable=True),
+ sa.Column('filterable', sa.Boolean(), nullable=True),
+ sa.Column('description', sa.Text(), nullable=True),
+ sa.Column('datasource_name', sa.String(length=255), nullable=True),
+ sa.Column('json', sa.Text(), nullable=True),
+ sa.Column('changed_by_fk', sa.Integer(), nullable=True),
+ sa.Column('created_by_fk', sa.Integer(), nullable=True),
+ sa.ForeignKeyConstraint(['changed_by_fk'], ['ab_user.id'], ),
+ sa.ForeignKeyConstraint(['created_by_fk'], ['ab_user.id'], ),
+ sa.ForeignKeyConstraint(['datasource_name'], ['elastic_datasources.datasource_name'], ),
+ sa.PrimaryKeyConstraint('id')
+ )
+ op.create_table(
+ 'elastic_metrics',
+ sa.Column('created_on', sa.DateTime(), nullable=True),
+ sa.Column('changed_on', sa.DateTime(), nullable=True),
+ sa.Column('id', sa.Integer(), nullable=False),
+ sa.Column('metric_name', sa.String(length=512), nullable=True),
+ sa.Column('verbose_name', sa.String(length=1024), nullable=True),
+ sa.Column('metric_type', sa.String(length=32), nullable=True),
+ sa.Column('description', sa.Text(), nullable=True),
+ sa.Column('is_restricted', sa.Boolean(), nullable=True),
+ sa.Column('d3format', sa.String(length=128), nullable=True),
+ sa.Column('datasource_name', sa.String(length=255), nullable=True),
+ sa.Column('json', sa.Text(), nullable=True),
+ sa.Column('changed_by_fk', sa.Integer(), nullable=True),
+ sa.Column('created_by_fk', sa.Integer(), nullable=True),
+ sa.ForeignKeyConstraint(['changed_by_fk'], ['ab_user.id'], ),
+ sa.ForeignKeyConstraint(['created_by_fk'], ['ab_user.id'], ),
+ sa.ForeignKeyConstraint(['datasource_name'], ['elastic_datasources.datasource_name'], ),
+ sa.PrimaryKeyConstraint('id')
+ )
+
+
+def downgrade():
+ op.drop_table('elastic_metrics')
+ op.drop_table('elastic_columns')
+ op.drop_table('elastic_datasources')
+ op.drop_table('elastic_clusters')
diff --git a/superset/views/core.py b/superset/views/core.py
index 06a0c1397a7f8..50ef4276e1116 100755
--- a/superset/views/core.py
+++ b/superset/views/core.py
@@ -2281,7 +2281,6 @@ class CssTemplateModelView(SupersetModelView, DeleteMixin):
class CssTemplateAsyncModelView(CssTemplateModelView):
list_columns = ['template_name', 'css']
-appbuilder.add_separator("Sources")
appbuilder.add_view(
CssTemplateModelView,
"CSS Templates",