diff --git a/client/app/assets/less/ant.less b/client/app/assets/less/ant.less index 5983963524..c42a83e2a7 100644 --- a/client/app/assets/less/ant.less +++ b/client/app/assets/less/ant.less @@ -14,7 +14,6 @@ @import '~antd/lib/radio/style/index'; @import '~antd/lib/time-picker/style/index'; @import '~antd/lib/pagination/style/index'; -@import '~antd/lib/drawer/style/index'; @import '~antd/lib/table/style/index'; @import '~antd/lib/popover/style/index'; @import '~antd/lib/icon/style/index'; diff --git a/client/app/assets/less/inc/schema-browser.less b/client/app/assets/less/inc/schema-browser.less index d547a78790..0034391086 100644 --- a/client/app/assets/less/inc/schema-browser.less +++ b/client/app/assets/less/inc/schema-browser.less @@ -7,14 +7,14 @@ div.table-name { border-radius: @redash-radius; position: relative; - .copy-to-editor, .info { + .copy-to-editor { display: none; } &:hover { background: fade(@redash-gray, 10%); - .copy-to-editor, .info { + .copy-to-editor { display: flex; } } @@ -36,7 +36,7 @@ div.table-name { background: transparent; } - .copy-to-editor, .info { + .copy-to-editor { color: fade(@redash-gray, 90%); cursor: pointer; position: absolute; @@ -49,10 +49,6 @@ div.table-name { justify-content: center; } - .info { - right: 20px - } - .table-open { padding: 0 22px 0 26px; overflow: hidden; @@ -60,14 +56,14 @@ div.table-name { white-space: nowrap; position: relative; - .copy-to-editor, .info { + .copy-to-editor { display: none; } &:hover { background: fade(@redash-gray, 10%); - .copy-to-editor, .info { + .copy-to-editor { display: flex; } } diff --git a/client/app/components/proptypes.js b/client/app/components/proptypes.js index 465d721b5c..7567727fac 100644 --- a/client/app/components/proptypes.js +++ b/client/app/components/proptypes.js @@ -11,13 +11,6 @@ export const DataSource = PropTypes.shape({ type_name: PropTypes.string, }); -export const DataSourceMetadata = PropTypes.shape({ - key: PropTypes.number, - name: PropTypes.string, - type: PropTypes.string, - example: PropTypes.string, -}); - export const Table = PropTypes.shape({ columns: PropTypes.arrayOf(PropTypes.string).isRequired, }); diff --git a/client/app/components/queries/SchemaData.jsx b/client/app/components/queries/SchemaData.jsx deleted file mode 100644 index b17600676e..0000000000 --- a/client/app/components/queries/SchemaData.jsx +++ /dev/null @@ -1,65 +0,0 @@ -import React from 'react'; -import PropTypes from 'prop-types'; -import { react2angular } from 'react2angular'; -import Drawer from 'antd/lib/drawer'; -import Table from 'antd/lib/table'; - -import { DataSourceMetadata } from '@/components/proptypes'; - -class SchemaData extends React.PureComponent { - static propTypes = { - show: PropTypes.bool.isRequired, - onClose: PropTypes.func.isRequired, - tableName: PropTypes.string, - tableMetadata: PropTypes.arrayOf(DataSourceMetadata), - }; - - static defaultProps = { - tableName: '', - tableMetadata: [], - }; - - render() { - const columns = [{ - title: 'Column Name', - dataIndex: 'name', - width: 400, - key: 'name', - }, { - title: 'Column Type', - dataIndex: 'type', - width: 400, - key: 'type', - }, { - title: 'Example', - dataIndex: 'example', - width: 400, - key: 'example', - }]; - - return ( - - - - ); - } -} - -export default function init(ngModule) { - ngModule.component('schemaData', react2angular(SchemaData, null, [])); -} - -init.init = true; diff --git a/client/app/components/queries/schema-browser.html b/client/app/components/queries/schema-browser.html index ea46c741f2..6e3f518059 100644 --- a/client/app/components/queries/schema-browser.html +++ b/client/app/components/queries/schema-browser.html @@ -16,23 +16,15 @@ {{table.name}} ({{table.size}}) -
-
{{column.name}} +
{{column}} + ng-click="$ctrl.itemSelected($event, [column])">
- diff --git a/client/app/components/queries/schema-browser.js b/client/app/components/queries/schema-browser.js index dd509dfa2f..34615aa590 100644 --- a/client/app/components/queries/schema-browser.js +++ b/client/app/components/queries/schema-browser.js @@ -8,17 +8,6 @@ function SchemaBrowserCtrl($rootScope, $scope) { $scope.$broadcast('vsRepeatTrigger'); }; - $scope.showSchemaInfo = false; - $scope.openSchemaInfo = ($event, tableName, tableMetadata) => { - $scope.tableName = tableName; - $scope.tableMetadata = tableMetadata; - $scope.showSchemaInfo = true; - $event.stopPropagation(); - }; - $scope.closeSchemaInfo = () => { - $scope.$apply(() => { $scope.showSchemaInfo = false; }); - }; - this.getSize = (table) => { let size = 22; diff --git a/migrations/versions/280daa582976_.py b/migrations/versions/280daa582976_.py deleted file mode 100644 index c3e8162241..0000000000 --- a/migrations/versions/280daa582976_.py +++ /dev/null @@ -1,55 +0,0 @@ -"""Add column metadata and table metadata - -Revision ID: 280daa582976 -Revises: e5c7a4e2df4d -Create Date: 2019-01-24 18:23:53.040608 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '280daa582976' -down_revision = 'e5c7a4e2df4d' -branch_labels = None -depends_on = None - - -def upgrade(): - op.create_table( - 'table_metadata', - sa.Column('updated_at', sa.DateTime(timezone=True), nullable=False), - sa.Column('created_at', sa.DateTime(timezone=True), nullable=False), - sa.Column('id', sa.Integer(), nullable=False), - sa.Column('org_id', sa.Integer(), nullable=False), - sa.Column('data_source_id', sa.Integer(), nullable=False), - sa.Column('exists', sa.Boolean(), nullable=False), - sa.Column('name', sa.String(length=255), nullable=False), - sa.Column('description', sa.String(length=4096), nullable=True), - sa.Column('column_metadata', sa.Boolean(), nullable=False), - sa.Column('sample_query', sa.Text(), nullable=True), - sa.ForeignKeyConstraint(['data_source_id'], ['data_sources.id'], ondelete="CASCADE"), - sa.ForeignKeyConstraint(['org_id'], ['organizations.id.id']), - sa.PrimaryKeyConstraint('id') - ) - op.create_table( - 'column_metadata', - sa.Column('updated_at', sa.DateTime(timezone=True), nullable=False), - sa.Column('created_at', sa.DateTime(timezone=True), nullable=False), - sa.Column('id', sa.Integer(), nullable=False), - sa.Column('org_id', sa.Integer(), nullable=False), - sa.Column('table_id', sa.Integer(), nullable=False), - sa.Column('name', sa.String(length=255), nullable=False), - sa.Column('type', sa.String(length=255), nullable=True), - sa.Column('example', sa.String(length=4096), nullable=True), - sa.Column('exists', sa.Boolean(), nullable=False), - sa.ForeignKeyConstraint(['table_id'], ['table_metadata.id'], ondelete="CASCADE"), - sa.ForeignKeyConstraint(['org_id'], ['organizations.id.id']), - sa.PrimaryKeyConstraint('id') - ) - - -def downgrade(): - op.drop_table('column_metadata') - op.drop_table('table_metadata') diff --git a/redash/handlers/data_sources.py b/redash/handlers/data_sources.py index c64165ad4d..65532ee509 100644 --- a/redash/handlers/data_sources.py +++ b/redash/handlers/data_sources.py @@ -6,11 +6,10 @@ from six import text_type from sqlalchemy.exc import IntegrityError -from redash import models, settings +from redash import models from redash.handlers.base import BaseResource, get_object_or_404 from redash.permissions import (require_access, require_admin, require_permission, view_only) -from redash.tasks.queries import refresh_schemas from redash.query_runner import (get_configuration_schema_for_query_runner_type, query_runners, NotSupported) from redash.utils import filter_none @@ -53,9 +52,6 @@ def post(self, data_source_id): data_source.name = req['name'] models.db.session.add(data_source) - # Refresh the stored schemas when a data source is updated - refresh_schemas.apply_async(queue=settings.SCHEMAS_REFRESH_QUEUE) - try: models.db.session.commit() except IntegrityError as e: @@ -133,9 +129,6 @@ def post(self): options=config) models.db.session.commit() - - # Refresh the stored schemas when a new data source is added to the list - refresh_schemas.apply_async(queue=settings.SCHEMAS_REFRESH_QUEUE) except IntegrityError as e: if req['name'] in e.message: abort(400, message="Data source with the name {} already exists.".format(req['name'])) @@ -158,10 +151,9 @@ def get(self, data_source_id): refresh = request.args.get('refresh') is not None response = {} + try: - if refresh: - refresh_schemas.apply(queue=settings.SCHEMAS_REFRESH_QUEUE) - response['schema'] = data_source.get_schema() + response['schema'] = data_source.get_schema(refresh) except NotSupported: response['error'] = { 'code': 1, diff --git a/redash/models/__init__.py b/redash/models/__init__.py index c40ef26450..1d9b5b11a2 100644 --- a/redash/models/__init__.py +++ b/redash/models/__init__.py @@ -7,7 +7,6 @@ import pytz import xlsxwriter -from operator import itemgetter from six import python_2_unicode_compatible, text_type from sqlalchemy import distinct, or_, and_, UniqueConstraint from sqlalchemy.dialects import postgresql @@ -66,62 +65,6 @@ def get(self, query_id): scheduled_queries_executions = ScheduledQueriesExecutions() -@python_2_unicode_compatible -class TableMetadata(TimestampMixin, db.Model): - id = Column(db.Integer, primary_key=True) - org_id = Column(db.Integer, db.ForeignKey("organizations.id")) - data_source_id = Column(db.Integer, db.ForeignKey("data_sources.id", ondelete="CASCADE")) - exists = Column(db.Boolean, default=True) - name = Column(db.String(255)) - description = Column(db.String(4096), nullable=True) - column_metadata = Column(db.Boolean, default=False) - sample_query = Column("sample_query", db.Text, nullable=True) - - __tablename__ = 'table_metadata' - - def __str__(self): - return text_type(self.table_name) - - def to_dict(self): - return { - 'id': self.id, - 'org_id': self.org_id, - 'data_source_id': self.data_source_id, - 'exists': self.exists, - 'name': self.name, - 'description': self.description, - 'column_metadata': self.column_metadata, - 'sample_query': self.sample_query, - } - - -@python_2_unicode_compatible -class ColumnMetadata(TimestampMixin, db.Model): - id = Column(db.Integer, primary_key=True) - org_id = Column(db.Integer, db.ForeignKey("organizations.id")) - table_id = Column(db.Integer, db.ForeignKey("table_metadata.id", ondelete="CASCADE")) - name = Column(db.String(255)) - type = Column(db.String(255), nullable=True) - example = Column(db.String(4096), nullable=True) - exists = Column(db.Boolean, default=True) - - __tablename__ = 'column_metadata' - - def __str__(self): - return text_type(self.name) - - def to_dict(self): - return { - 'id': self.id, - 'org_id': self.org_id, - 'table_id': self.table_id, - 'name': self.name, - 'type': self.type, - 'example': self.example, - 'exists': self.exists, - } - - @python_2_unicode_compatible @generic_repr('id', 'name', 'type', 'org_id', 'created_at') class DataSource(BelongsToOrgMixin, db.Model): @@ -202,29 +145,22 @@ def delete(self): db.session.commit() return res - def get_schema(self): - schema = [] - tables = TableMetadata.query.filter(TableMetadata.data_source_id == self.id).all() - for table in tables: - if not table.exists: - continue + def get_schema(self, refresh=False): + key = "data_source:schema:{}".format(self.id) + + cache = None + if not refresh: + cache = redis_connection.get(key) + + if cache is None: + query_runner = self.query_runner + schema = sorted(query_runner.get_schema(get_stats=refresh), key=lambda t: t['name']) + + redis_connection.set(key, json_dumps(schema)) + else: + schema = json_loads(cache) - table_info = { - 'name': table.name, - 'exists': table.exists, - 'hasColumnMetadata': table.column_metadata, - 'columns': []} - columns = ColumnMetadata.query.filter(ColumnMetadata.table_id == table.id) - table_info['columns'] = sorted([{ - 'key': column.id, - 'name': column.name, - 'type': column.type, - 'exists': column.exists, - 'example': column.example - } for column in columns if column.exists == True], key=itemgetter('name')) - schema.append(table_info) - - return sorted(schema, key=itemgetter('name')) + return schema def _pause_key(self): return 'ds:{}:pause'.format(self.id) diff --git a/redash/query_runner/__init__.py b/redash/query_runner/__init__.py index 4b76f54cdc..c252d841b4 100644 --- a/redash/query_runner/__init__.py +++ b/redash/query_runner/__init__.py @@ -54,7 +54,6 @@ class NotSupported(Exception): class BaseQueryRunner(object): noop_query = None - data_sample_query = None def __init__(self, configuration): self.syntax = 'sql' @@ -119,27 +118,6 @@ def _run_query_internal(self, query): raise Exception("Failed running query [%s]." % query) return json_loads(results)['rows'] - def get_table_sample(self, table_name): - if not self.configuration.get('samples', False): - return {} - - if self.data_sample_query is None: - raise NotImplementedError() - - query = self.data_sample_query.format(table=table_name) - - results, error = self.run_query(query, None) - if error is not None: - raise NotSupported() - - rows = json_loads(results).get('rows', []) - if len(rows) > 0: - sample = rows[0] - else: - sample = {} - - return sample - @classmethod def to_dict(cls): return { diff --git a/redash/query_runner/athena.py b/redash/query_runner/athena.py index 36792b0d62..8d4f3dfcff 100644 --- a/redash/query_runner/athena.py +++ b/redash/query_runner/athena.py @@ -43,7 +43,6 @@ def format(self, operation, parameters=None): class Athena(BaseQueryRunner): noop_query = 'SELECT 1' - data_sample_query = "SELECT * FROM {table} LIMIT 1" @classmethod def name(cls): @@ -79,10 +78,6 @@ def configuration_schema(cls): 'type': 'boolean', 'title': 'Use Glue Data Catalog', }, - 'samples': { - 'type': 'boolean', - 'title': 'Show Data Samples' - }, }, 'required': ['region', 's3_staging_dir'], 'order': ['region', 'aws_access_key', 'aws_secret_key', 's3_staging_dir', 'schema'], @@ -148,7 +143,7 @@ def get_schema(self, get_stats=False): schema = {} query = """ - SELECT table_schema, table_name, column_name, data_type AS column_type + SELECT table_schema, table_name, column_name FROM information_schema.columns WHERE table_schema NOT IN ('information_schema') """ @@ -158,17 +153,11 @@ def get_schema(self, get_stats=False): raise Exception("Failed getting schema.") results = json_loads(results) - - for i, row in enumerate(results['rows']): + for row in results['rows']: table_name = '{0}.{1}'.format(row['table_schema'], row['table_name']) if table_name not in schema: - schema[table_name] = {'name': table_name, 'columns': [], 'metadata': []} - + schema[table_name] = {'name': table_name, 'columns': []} schema[table_name]['columns'].append(row['column_name']) - schema[table_name]['metadata'].append({ - "name": row['column_name'], - "type": row['column_type'], - }) return schema.values() diff --git a/redash/query_runner/mysql.py b/redash/query_runner/mysql.py index eb805944ed..bfd6e7198e 100644 --- a/redash/query_runner/mysql.py +++ b/redash/query_runner/mysql.py @@ -28,7 +28,6 @@ class Mysql(BaseSQLQueryRunner): noop_query = "SELECT 1" - data_sample_query = "SELECT * FROM {table} LIMIT 1" @classmethod def configuration_schema(cls): @@ -55,11 +54,7 @@ def configuration_schema(cls): 'port': { 'type': 'number', 'default': 3306, - }, - 'samples': { - 'type': 'boolean', - 'title': 'Show Data Samples' - }, + } }, "order": ['host', 'port', 'user', 'passwd', 'db'], 'required': ['db'], @@ -105,8 +100,7 @@ def _get_tables(self, schema): query = """ SELECT col.table_schema as table_schema, col.table_name as table_name, - col.column_name as column_name, - col.data_type AS column_type + col.column_name as column_name FROM `information_schema`.`columns` col WHERE col.table_schema NOT IN ('information_schema', 'performance_schema', 'mysql', 'sys'); """ @@ -118,20 +112,16 @@ def _get_tables(self, schema): results = json_loads(results) - for i, row in enumerate(results['rows']): + for row in results['rows']: if row['table_schema'] != self.configuration['db']: table_name = u'{}.{}'.format(row['table_schema'], row['table_name']) else: table_name = row['table_name'] if table_name not in schema: - schema[table_name] = {'name': table_name, 'columns': [], 'metadata': []} + schema[table_name] = {'name': table_name, 'columns': []} schema[table_name]['columns'].append(row['column_name']) - schema[table_name]['metadata'].append({ - "name": row['column_name'], - "type": row['column_type'], - }) return schema.values() diff --git a/redash/query_runner/pg.py b/redash/query_runner/pg.py index 9ebd3c796a..048f5880f0 100644 --- a/redash/query_runner/pg.py +++ b/redash/query_runner/pg.py @@ -67,7 +67,6 @@ def _wait(conn, timeout=None): class PostgreSQL(BaseSQLQueryRunner): noop_query = "SELECT 1" - data_sample_query = "SELECT * FROM {table} LIMIT 1" @classmethod def configuration_schema(cls): @@ -96,11 +95,7 @@ def configuration_schema(cls): "type": "string", "title": "SSL Mode", "default": "prefer" - }, - "samples": { - "type": "boolean", - "title": "Show Data Samples" - }, + } }, "order": ['host', 'port', 'user', 'password'], "required": ["dbname"], @@ -126,13 +121,9 @@ def _get_definitions(self, schema, query): table_name = row['table_name'] if table_name not in schema: - schema[table_name] = {'name': table_name, 'columns': [], 'metadata': []} + schema[table_name] = {'name': table_name, 'columns': []} schema[table_name]['columns'].append(row['column_name']) - schema[table_name]['metadata'].append({ - "name": row['column_name'], - "type": row['column_type'], - }) def _get_tables(self, schema): ''' @@ -152,8 +143,7 @@ def _get_tables(self, schema): query = """ SELECT s.nspname as table_schema, c.relname as table_name, - a.attname as column_name, - a.atttypid::regtype as column_type + a.attname as column_name FROM pg_class c JOIN pg_namespace s ON c.relnamespace = s.oid @@ -261,11 +251,7 @@ def configuration_schema(cls): "type": "string", "title": "SSL Mode", "default": "prefer" - }, - "samples": { - "type": "boolean", - "title": "Show Data Samples" - }, + } }, "order": ['host', 'port', 'user', 'password'], "required": ["dbname", "user", "password", "host", "port"], @@ -285,12 +271,11 @@ def _get_tables(self, schema): SELECT DISTINCT table_name, table_schema, column_name, - data_type AS column_type, ordinal_position AS pos FROM svv_columns WHERE table_schema NOT IN ('pg_internal','pg_catalog','information_schema') ) - SELECT table_name, table_schema, column_name, column_type + SELECT table_name, table_schema, column_name FROM tables WHERE HAS_SCHEMA_PRIVILEGE(table_schema, 'USAGE') AND diff --git a/redash/query_runner/presto.py b/redash/query_runner/presto.py index 0c851cf3b0..975ea70c07 100644 --- a/redash/query_runner/presto.py +++ b/redash/query_runner/presto.py @@ -31,7 +31,6 @@ class Presto(BaseQueryRunner): noop_query = 'SHOW TABLES' - data_sample_query = "SELECT * FROM {table} LIMIT 1" @classmethod def configuration_schema(cls): @@ -57,10 +56,6 @@ def configuration_schema(cls): 'username': { 'type': 'string' }, - 'samples': { - 'type': 'boolean', - 'title': 'Show Data Samples' - }, }, 'order': ['host', 'protocol', 'port', 'username', 'schema', 'catalog'], 'required': ['host'] @@ -77,12 +72,13 @@ def type(cls): def get_schema(self, get_stats=False): schema = {} query = """ - SELECT table_schema, table_name, column_name, data_type AS column_type + SELECT table_schema, table_name, column_name FROM information_schema.columns WHERE table_schema NOT IN ('pg_catalog', 'information_schema') """ results, error = self.run_query(query, None) + if error is not None: raise Exception("Failed getting schema.") @@ -90,14 +86,11 @@ def get_schema(self, get_stats=False): for row in results['rows']: table_name = '{}.{}'.format(row['table_schema'], row['table_name']) + if table_name not in schema: - schema[table_name] = {'name': table_name, 'columns': [], 'metadata': []} + schema[table_name] = {'name': table_name, 'columns': []} schema[table_name]['columns'].append(row['column_name']) - schema[table_name]['metadata'].append({ - "name": row['column_name'], - "type": row['column_type'], - }) return schema.values() diff --git a/redash/settings/__init__.py b/redash/settings/__init__.py index 2550287d28..f95cfa0906 100644 --- a/redash/settings/__init__.py +++ b/redash/settings/__init__.py @@ -259,9 +259,6 @@ def all_settings(): # Enhance schema fetching SCHEMA_RUN_TABLE_SIZE_CALCULATIONS = parse_boolean(os.environ.get("REDASH_SCHEMA_RUN_TABLE_SIZE_CALCULATIONS", "false")) -# Frequency of clearing out old schema metadata. -SCHEMA_METADATA_TTL_DAYS = int(os.environ.get("REDASH_SCHEMA_METADATA_TTL_DAYS", 60)) - # Allow Parameters in Embeds # WARNING: With this option enabled, Redash reads query parameters from the request URL (risk of SQL injection!) ALLOW_PARAMETERS_IN_EMBEDS = parse_boolean(os.environ.get("REDASH_ALLOW_PARAMETERS_IN_EMBEDS", "false")) diff --git a/redash/tasks/__init__.py b/redash/tasks/__init__.py index d04f42b3cc..e5de680381 100644 --- a/redash/tasks/__init__.py +++ b/redash/tasks/__init__.py @@ -1,3 +1,3 @@ from .general import record_event, version_check, send_mail, sync_user_details -from .queries import QueryTask, refresh_queries, refresh_schemas, refresh_schema, cleanup_query_results, execute_query, get_table_sample_data, cleanup_schema_metadata +from .queries import QueryTask, refresh_queries, refresh_schemas, cleanup_query_results, execute_query from .alerts import check_alerts_for_query diff --git a/redash/tasks/queries.py b/redash/tasks/queries.py index 9c97a97e2a..a83ab3242e 100644 --- a/redash/tasks/queries.py +++ b/redash/tasks/queries.py @@ -1,17 +1,14 @@ import logging import signal import time -import datetime import redis from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded from celery.result import AsyncResult from celery.utils.log import get_task_logger from six import text_type -from sqlalchemy.orm import load_only -from redash import models, redis_connection, settings, statsd_client, utils -from redash.models import TableMetadata, ColumnMetadata, db +from redash import models, redis_connection, settings, statsd_client from redash.query_runner import InterruptException from redash.tasks.alerts import check_alerts_for_query from redash.utils import gen_query_hash, json_dumps, json_loads, utcnow, mustache_render @@ -232,143 +229,13 @@ def cleanup_query_results(): logger.info("Deleted %d unused query results.", deleted_count) -@celery.task(name="redash.tasks.get_table_sample_data") -def get_table_sample_data(data_source_id, table, table_id): - ds = models.DataSource.get_by_id(data_source_id) - sample = ds.query_runner.get_table_sample(table['name']) - if not sample: - return - - # If a column exists, add a sample to it. - for i, column in enumerate(table['columns']): - persisted_column = ColumnMetadata.query.filter( - ColumnMetadata.name == column, - ColumnMetadata.table_id == table_id, - ).options(load_only('id')).first() - - if persisted_column: - column_example = str(sample.get(column, None)) - if column_example and len(column_example) > 4000: - column_example = u'{}...'.format(column_example[:4000]) - - ColumnMetadata.query.filter( - ColumnMetadata.id == persisted_column.id, - ).update({ - 'example': column_example, - }) - models.db.session.commit() - -def cleanup_data_in_table(table_model): - removed_metadata = table_model.query.filter( - table_model.exists == False, - ).options(load_only('updated_at')) - - for removed_metadata_row in removed_metadata: - is_old_data = ( - utils.utcnow() - removed_metadata_row.updated_at - ) > datetime.timedelta(days=settings.SCHEMA_METADATA_TTL_DAYS) - - table_model.query.filter( - table_model.id == removed_metadata_row.id, - ).delete() - - db.session.commit() - -@celery.task(name="redash.tasks.cleanup_schema_metadata") -def cleanup_schema_metadata(): - cleanup_data_in_table(TableMetadata) - cleanup_data_in_table(ColumnMetadata) - @celery.task(name="redash.tasks.refresh_schema", time_limit=90, soft_time_limit=60) def refresh_schema(data_source_id): ds = models.DataSource.get_by_id(data_source_id) logger.info(u"task=refresh_schema state=start ds_id=%s", ds.id) start_time = time.time() - try: - existing_tables = set() - schema = ds.query_runner.get_schema(get_stats=True) - for table in schema: - table_name = table['name'] - existing_tables.add(table_name) - - # Assume that there will only exist 1 table with a given name for a given data source so we use first() - persisted_table = TableMetadata.query.filter( - TableMetadata.name == table_name, - TableMetadata.data_source_id == ds.id, - ).first() - - if persisted_table: - TableMetadata.query.filter( - TableMetadata.id == persisted_table.id, - ).update({"exists": True}) - else: - metadata = 'metadata' in table - persisted_table = TableMetadata( - org_id=ds.org_id, - name=table_name, - data_source_id=ds.id, - column_metadata=metadata - ) - models.db.session.add(persisted_table) - models.db.session.flush() - - existing_columns = set() - for i, column in enumerate(table['columns']): - existing_columns.add(column) - column_metadata = { - 'org_id': ds.org_id, - 'table_id': persisted_table.id, - 'name': column, - 'type': None, - 'example': None, - 'exists': True - } - if 'metadata' in table: - column_metadata['type'] = table['metadata'][i]['type'] - - # If the column exists, update it, otherwise create a new one. - persisted_column = ColumnMetadata.query.filter( - ColumnMetadata.name == column, - ColumnMetadata.table_id == persisted_table.id, - ).options(load_only('id')).first() - if persisted_column: - ColumnMetadata.query.filter( - ColumnMetadata.id == persisted_column.id, - ).update(column_metadata) - else: - models.db.session.add(ColumnMetadata(**column_metadata)) - models.db.session.commit() - - get_table_sample_data.apply_async( - args=(data_source_id, table, persisted_table.id), - queue=settings.SCHEMAS_REFRESH_QUEUE - ) - - # If a column did not exist, set the 'column_exists' flag to false. - existing_columns_list = tuple(existing_columns) - ColumnMetadata.query.filter( - ColumnMetadata.exists == True, - ColumnMetadata.table_id == persisted_table.id, - ~ColumnMetadata.name.in_(existing_columns_list), - ).update({ - "exists": False, - "updated_at": db.func.now() - }, synchronize_session='fetch') - - # If a table did not exist in the get_schema() response above, set the 'exists' flag to false. - existing_tables_list = tuple(existing_tables) - tables_to_update = TableMetadata.query.filter( - TableMetadata.exists == True, - TableMetadata.data_source_id == ds.id, - ~TableMetadata.name.in_(existing_tables_list) - ).update({ - "exists": False, - "updated_at": db.func.now() - }, synchronize_session='fetch') - - models.db.session.commit() - + ds.get_schema(refresh=True) logger.info(u"task=refresh_schema state=finished ds_id=%s runtime=%.2f", ds.id, time.time() - start_time) statsd_client.incr('refresh_schema.success') except SoftTimeLimitExceeded: diff --git a/redash/worker.py b/redash/worker.py index d90c1279a4..23c3d7b6fe 100644 --- a/redash/worker.py +++ b/redash/worker.py @@ -27,10 +27,6 @@ 'sync_user_details': { 'task': 'redash.tasks.sync_user_details', 'schedule': timedelta(minutes=1), - }, - 'cleanup_schema_metadata': { - 'task': 'redash.tasks.cleanup_schema_metadata', - 'schedule': timedelta(days=3), } } diff --git a/tests/factories.py b/tests/factories.py index 4abba0afd3..2c82e186da 100644 --- a/tests/factories.py +++ b/tests/factories.py @@ -79,15 +79,6 @@ def __call__(self): data_source=data_source_factory.create, org_id=1) -table_metadata_factory = ModelFactory(redash.models.TableMetadata, - data_source_id=1, - exists=True, - name='table') - -column_metadata_factory = ModelFactory(redash.models.ColumnMetadata, - table_id=1, - name='column') - query_with_params_factory = ModelFactory(redash.models.Query, name='New Query with Params', description='', @@ -185,12 +176,6 @@ def create_org(self, **kwargs): return org - def create_table_metadata(self, **kwargs): - return table_metadata_factory.create(**kwargs) - - def create_column_metadata(self, **kwargs): - return column_metadata_factory.create(**kwargs) - def create_user(self, **kwargs): args = { 'org': self.org, diff --git a/tests/models/test_data_sources.py b/tests/models/test_data_sources.py index 429cbbd5f4..037ff77a05 100644 --- a/tests/models/test_data_sources.py +++ b/tests/models/test_data_sources.py @@ -1,3 +1,4 @@ +import mock from tests import BaseTestCase from redash.models import DataSource, Query, QueryResult @@ -6,43 +7,38 @@ class DataSourceTest(BaseTestCase): def test_get_schema(self): - data_source = self.factory.create_data_source() + return_value = [{'name': 'table', 'columns': []}] + + with mock.patch('redash.query_runner.pg.PostgreSQL.get_schema') as patched_get_schema: + patched_get_schema.return_value = return_value + + schema = self.factory.data_source.get_schema() + + self.assertEqual(return_value, schema) + + def test_get_schema_uses_cache(self): + return_value = [{'name': 'table', 'columns': []}] + with mock.patch('redash.query_runner.pg.PostgreSQL.get_schema') as patched_get_schema: + patched_get_schema.return_value = return_value + + self.factory.data_source.get_schema() + schema = self.factory.data_source.get_schema() + + self.assertEqual(return_value, schema) + self.assertEqual(patched_get_schema.call_count, 1) + + def test_get_schema_skips_cache_with_refresh_true(self): + return_value = [{'name': 'table', 'columns': []}] + with mock.patch('redash.query_runner.pg.PostgreSQL.get_schema') as patched_get_schema: + patched_get_schema.return_value = return_value + + self.factory.data_source.get_schema() + new_return_value = [{'name': 'new_table', 'columns': []}] + patched_get_schema.return_value = new_return_value + schema = self.factory.data_source.get_schema(refresh=True) - # Create an existing table with a non-existing column - table_metadata = self.factory.create_table_metadata( - data_source_id=data_source.id, - org_id=data_source.org_id - ) - column_metadata = self.factory.create_column_metadata( - table_id=table_metadata.id, - org_id=data_source.org_id, - type='boolean', - example=True, - exists=False - ) - - # Create a non-existing table with an existing column - table_metadata = self.factory.create_table_metadata( - data_source_id=data_source.id, - org_id=data_source.org_id, - name='table_doesnt_exist', - exists=False - ) - column_metadata = self.factory.create_column_metadata( - table_id=table_metadata.id, - org_id=data_source.org_id, - type='boolean', - example=True, - ) - - return_value = [{ - 'name': 'table', - 'hasColumnMetadata': False, - 'exists': True, - 'columns': [] - }] - schema = data_source.get_schema() - self.assertEqual(return_value, schema) + self.assertEqual(new_return_value, schema) + self.assertEqual(patched_get_schema.call_count, 2) class TestDataSourceCreate(BaseTestCase): diff --git a/tests/query_runner/test_get_schema_format.py b/tests/query_runner/test_get_schema_format.py deleted file mode 100644 index 7ebbf6eb56..0000000000 --- a/tests/query_runner/test_get_schema_format.py +++ /dev/null @@ -1,77 +0,0 @@ -import json -import mock - -from unittest import TestCase - -from redash.query_runner.presto import Presto -from redash.query_runner.athena import Athena -from redash.query_runner.mysql import Mysql -from redash.query_runner.pg import PostgreSQL, Redshift - -class TestBaseQueryRunner(TestCase): - def setUp(self): - self.query_runners = [{ - 'instance': Presto({}), - 'mock_location': 'presto.Presto' - }, { - 'instance': Athena({}), - 'mock_location': 'athena.Athena' - }, { - 'instance': Mysql({'db': None}), - 'mock_location': 'mysql.Mysql' - }, { - 'instance': PostgreSQL({}), - 'mock_location': 'pg.PostgreSQL' - }, { - 'instance': Redshift({}), - 'mock_location': 'pg.Redshift' - }] - - def _setup_mock(self, function_to_patch): - patcher = mock.patch(function_to_patch) - patched_function = patcher.start() - self.addCleanup(patcher.stop) - return patched_function - - def assert_correct_schema_format(self, query_runner, mock_location): - EXPECTED_SCHEMA_RESULT = [{ - 'columns': ['created_date'], - 'metadata': [{ - 'name': 'created_date', - 'type': 'varchar', - }], - 'name': 'default.table_name' - }] - - get_schema_query_response = { - "rows": [{ - "table_schema": "default", - "table_name": "table_name", - "column_type": "varchar", - "column_name": "created_date" - }] - } - get_samples_query_response = { - "rows": [{ - "created_date": "2017-10-26" - }] - } - - self.run_count = 0 - def query_runner_resonses(query, user): - response = (json.dumps(get_schema_query_response), None) - if self.run_count > 0: - response = (json.dumps(get_samples_query_response), None) - self.run_count += 1 - return response - - self.patched_run_query = self._setup_mock( - 'redash.query_runner.{location}.run_query'.format(location=mock_location)) - self.patched_run_query.side_effect = query_runner_resonses - - schema = query_runner.get_schema() - self.assertEqual(schema, EXPECTED_SCHEMA_RESULT) - - def test_get_schema_format(self): - for runner in self.query_runners: - self.assert_correct_schema_format(runner['instance'], runner['mock_location']) diff --git a/tests/tasks/test_queries.py b/tests/tasks/test_queries.py index a0afc7367d..257c2b5481 100644 --- a/tests/tasks/test_queries.py +++ b/tests/tasks/test_queries.py @@ -3,14 +3,11 @@ import uuid import mock -import datetime from tests import BaseTestCase -from redash import redis_connection, models, utils -from redash.models import TableMetadata +from redash import redis_connection, models from redash.query_runner.pg import PostgreSQL -from redash.tasks.queries import (QueryExecutionError, enqueue_query, - execute_query, cleanup_data_in_table) +from redash.tasks.queries import QueryExecutionError, enqueue_query, execute_query FakeResult = namedtuple('FakeResult', 'id') @@ -117,24 +114,3 @@ def test_success_after_failure(self): scheduled_query_id=q.id) q = models.Query.get_by_id(q.id) self.assertEqual(q.schedule_failures, 0) - - -class TestPruneSchemaMetadata(BaseTestCase): - - def test_cleanup_data_in_table(self): - data_source = self.factory.create_data_source() - - # Create an existing table with a non-existing column - table_metadata = self.factory.create_table_metadata( - data_source_id=data_source.id, - org_id=data_source.org_id, - exists=False, - updated_at=(utils.utcnow() - datetime.timedelta(days=70)) - ) - all_tables = TableMetadata.query.all() - self.assertEqual(len(all_tables), 1) - - cleanup_data_in_table(TableMetadata) - - all_tables = TableMetadata.query.all() - self.assertEqual(len(all_tables), 0) diff --git a/tests/tasks/test_refresh_schemas.py b/tests/tasks/test_refresh_schemas.py index 201dd9cecb..df29f5f207 100644 --- a/tests/tasks/test_refresh_schemas.py +++ b/tests/tasks/test_refresh_schemas.py @@ -1,49 +1,10 @@ -import copy - from mock import patch from tests import BaseTestCase -from redash import models -from redash.tasks import refresh_schemas, refresh_schema, get_table_sample_data -from redash.models import TableMetadata, ColumnMetadata +from redash.tasks import refresh_schemas class TestRefreshSchemas(BaseTestCase): - def setUp(self): - super(TestRefreshSchemas, self).setUp() - - self.COLUMN_NAME = 'first_column' - self.COLUMN_TYPE = 'text' - self.COLUMN_EXAMPLE = 'some text for column value' - self.EXPECTED_COLUMN_METADATA = { - 'id': 1, - 'org_id': 1, - 'table_id': 1, - 'name': self.COLUMN_NAME, - 'type': self.COLUMN_TYPE, - 'example': self.COLUMN_EXAMPLE, - 'exists': True, - } - - get_schema_patcher = patch('redash.query_runner.pg.PostgreSQL.get_schema') - self.patched_get_schema = get_schema_patcher.start() - self.addCleanup(get_schema_patcher.stop) - self.default_schema_return_value = [{ - 'name': 'table', - 'columns': [self.COLUMN_NAME], - 'metadata': [{ - 'name': self.COLUMN_NAME, - 'type': self.COLUMN_TYPE, - }] - }] - self.patched_get_schema.return_value = self.default_schema_return_value - - - get_table_sample_patcher = patch('redash.query_runner.BaseQueryRunner.get_table_sample') - patched_get_table_sample = get_table_sample_patcher.start() - self.addCleanup(get_table_sample_patcher.stop) - patched_get_table_sample.return_value = {self.COLUMN_NAME: self.COLUMN_EXAMPLE} - def test_calls_refresh_of_all_data_sources(self): self.factory.data_source # trigger creation with patch('redash.tasks.queries.refresh_schema.apply_async') as refresh_job: @@ -62,101 +23,3 @@ def test_skips_paused_data_sources(self): with patch('redash.tasks.queries.refresh_schema.apply_async') as refresh_job: refresh_schemas() refresh_job.assert_called() - - def test_refresh_schema_creates_tables(self): - EXPECTED_TABLE_METADATA = { - 'id': 1, - 'org_id': 1, - 'exists': True, - 'name': 'table', - 'sample_query': None, - 'description': None, - 'column_metadata': True, - 'data_source_id': 1 - } - - refresh_schema(self.factory.data_source.id) - get_table_sample_data( - self.factory.data_source.id, { - "name": 'table', - "columns": [self.COLUMN_NAME] - }, 1 - ) - table_metadata = TableMetadata.query.all() - column_metadata = ColumnMetadata.query.all() - - self.assertEqual(len(table_metadata), 1) - self.assertEqual(len(column_metadata), 1) - self.assertEqual(table_metadata[0].to_dict(), EXPECTED_TABLE_METADATA) - self.assertEqual(column_metadata[0].to_dict(), self.EXPECTED_COLUMN_METADATA) - - def test_refresh_schema_deleted_table_marked(self): - refresh_schema(self.factory.data_source.id) - table_metadata = TableMetadata.query.all() - column_metadata = ColumnMetadata.query.all() - - self.assertEqual(len(table_metadata), 1) - self.assertEqual(len(column_metadata), 1) - self.assertTrue(table_metadata[0].to_dict()['exists']) - - # Table is gone, `exists` should be False. - self.patched_get_schema.return_value = [] - - refresh_schema(self.factory.data_source.id) - table_metadata = TableMetadata.query.all() - column_metadata = ColumnMetadata.query.all() - - self.assertEqual(len(table_metadata), 1) - self.assertEqual(len(column_metadata), 1) - self.assertFalse(table_metadata[0].to_dict()['exists']) - - # Table is back, `exists` should be True again. - self.patched_get_schema.return_value = self.default_schema_return_value - refresh_schema(self.factory.data_source.id) - table_metadata = TableMetadata.query.all() - self.assertTrue(table_metadata[0].to_dict()['exists']) - - def test_refresh_schema_delete_column(self): - NEW_COLUMN_NAME = 'new_column' - refresh_schema(self.factory.data_source.id) - column_metadata = ColumnMetadata.query.all() - - self.assertTrue(column_metadata[0].to_dict()['exists']) - - self.patched_get_schema.return_value = [{ - 'name': 'table', - 'columns': [NEW_COLUMN_NAME], - 'metadata': [{ - 'name': NEW_COLUMN_NAME, - 'type': self.COLUMN_TYPE, - }] - }] - - refresh_schema(self.factory.data_source.id) - column_metadata = ColumnMetadata.query.all() - self.assertEqual(len(column_metadata), 2) - - self.assertFalse(column_metadata[1].to_dict()['exists']) - self.assertTrue(column_metadata[0].to_dict()['exists']) - - def test_refresh_schema_update_column(self): - UPDATED_COLUMN_TYPE = 'varchar' - - refresh_schema(self.factory.data_source.id) - get_table_sample_data( - self.factory.data_source.id, { - "name": 'table', - "columns": [self.COLUMN_NAME] - }, 1 - ) - column_metadata = ColumnMetadata.query.all() - self.assertEqual(column_metadata[0].to_dict(), self.EXPECTED_COLUMN_METADATA) - - updated_schema = copy.deepcopy(self.default_schema_return_value) - updated_schema[0]['metadata'][0]['type'] = UPDATED_COLUMN_TYPE - self.patched_get_schema.return_value = updated_schema - - refresh_schema(self.factory.data_source.id) - column_metadata = ColumnMetadata.query.all() - self.assertNotEqual(column_metadata[0].to_dict(), self.EXPECTED_COLUMN_METADATA) - self.assertEqual(column_metadata[0].to_dict()['type'], UPDATED_COLUMN_TYPE) diff --git a/tests/test_cli.py b/tests/test_cli.py index 7b68df122c..25ab3ef5c9 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -16,7 +16,7 @@ def test_interactive_new(self): result = runner.invoke( manager, ['ds', 'new'], - input="test\n%s\n\n\nexample.com\n\n\n\ntestdb\n" % (pg_i,)) + input="test\n%s\n\n\nexample.com\n\n\ntestdb\n" % (pg_i,)) self.assertFalse(result.exception) self.assertEqual(result.exit_code, 0) self.assertEqual(DataSource.query.count(), 1)