Skip to content

Commit

Permalink
Sample processing should be rate-limited.
Browse files Browse the repository at this point in the history
  • Loading branch information
Marina Samuel committed Apr 17, 2019
1 parent 2fc50af commit 82546b0
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 20 deletions.
25 changes: 25 additions & 0 deletions migrations/versions/6adb92e75691_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""Add sample_updated_at column to table_metadata
Revision ID: 6adb92e75691
Revises: 280daa582976
Create Date: 2019-04-10 20:13:13.714589
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '6adb92e75691'
down_revision = '280daa582976'
branch_labels = None
depends_on = None


def upgrade():
op.add_column('table_metadata', sa.Column(
'sample_updated_at', sa.DateTime(timezone=True), nullable=True))


def downgrade():
op.drop_column('table_metadata', 'sample_updated_at')
2 changes: 2 additions & 0 deletions redash/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class TableMetadata(TimestampMixin, db.Model):
description = Column(db.String(4096), nullable=True)
column_metadata = Column(db.Boolean, default=False)
sample_query = Column("sample_query", db.Text, nullable=True)
sample_updated_at = Column(db.DateTime(True), nullable=True)

__tablename__ = 'table_metadata'

Expand All @@ -93,6 +94,7 @@ def to_dict(self):
'description': self.description,
'column_metadata': self.column_metadata,
'sample_query': self.sample_query,
'sample_updated_at': self.sample_updated_at,
}


Expand Down
5 changes: 4 additions & 1 deletion redash/query_runner/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ def format(self, operation, parameters=None):

class Athena(BaseQueryRunner):
noop_query = 'SELECT 1'
sample_query = "SELECT * FROM {table} LIMIT 1"

# This takes a 1% random sample from {table}, reducing
# the runtime and data scanned for the query
sample_query = "SELECT * FROM {table} TABLESAMPLE SYSTEM (1) LIMIT 1"

@classmethod
def name(cls):
Expand Down
5 changes: 4 additions & 1 deletion redash/query_runner/presto.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ class Presto(BaseQueryRunner):
'title': 'Show Data Samples'
},
}
sample_query = "SELECT * FROM {table} LIMIT 1"

# This takes a 1% random sample from {table}, reducing
# the runtime and data scanned for the query
sample_query = "SELECT * FROM {table} TABLESAMPLE SYSTEM (1) LIMIT 1"

@classmethod
def configuration_schema(cls):
Expand Down
4 changes: 4 additions & 0 deletions redash/settings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def all_settings():

SCHEMAS_REFRESH_SCHEDULE = int(os.environ.get("REDASH_SCHEMAS_REFRESH_SCHEDULE", 360))
SCHEMAS_REFRESH_QUEUE = os.environ.get("REDASH_SCHEMAS_REFRESH_QUEUE", "celery")
SAMPLES_REFRESH_QUEUE = os.environ.get("REDASH_SAMPLES_REFRESH_QUEUE", "celery")

AUTH_TYPE = os.environ.get("REDASH_AUTH_TYPE", "api_key")
ENFORCE_HTTPS = parse_boolean(os.environ.get("REDASH_ENFORCE_HTTPS", "false"))
Expand Down Expand Up @@ -255,6 +256,9 @@ def all_settings():
# Frequency of clearing out old schema metadata.
SCHEMA_METADATA_TTL_DAYS = int(os.environ.get("REDASH_SCHEMA_METADATA_TTL_DAYS", 60))

# Frequency of schema samples refresh
SCHEMA_SAMPLE_REFRESH_FREQUENCY_DAYS = int(os.environ.get("REDASH_SCHEMA_SAMPLE_REFRESH_FREQUENCY_DAYS", 14))

# Allow Parameters in Embeds
# WARNING: With this option enabled, Redash reads query parameters from the request URL (risk of SQL injection!)
ALLOW_PARAMETERS_IN_EMBEDS = parse_boolean(os.environ.get("REDASH_ALLOW_PARAMETERS_IN_EMBEDS", "false"))
Expand Down
2 changes: 1 addition & 1 deletion redash/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .general import record_event, version_check, send_mail, sync_user_details
from .queries import QueryTask, refresh_queries, refresh_schemas, refresh_schema, cleanup_query_results, execute_query, get_table_sample_data, cleanup_schema_metadata
from .queries import QueryTask, refresh_queries, refresh_schemas, refresh_schema, cleanup_query_results, execute_query, update_sample, cleanup_schema_metadata, refresh_samples
from .alerts import check_alerts_for_query
66 changes: 56 additions & 10 deletions redash/tasks/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
import datetime

import redis
from celery import group
from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
from celery.result import AsyncResult
from celery.utils.log import get_task_logger
from six import text_type
from sqlalchemy.orm import load_only
from sqlalchemy import or_

from redash import models, redis_connection, settings, statsd_client, utils
from redash.models import TableMetadata, ColumnMetadata, db
Expand Down Expand Up @@ -242,8 +244,14 @@ def truncate_long_string(original_str, max_length):
return new_str


@celery.task(name="redash.tasks.get_table_sample_data")
def get_table_sample_data(existing_columns, data_source_id, table_name, table_id):
@celery.task(name="redash.tasks.update_sample")
def update_sample(data_source_id, table_name, table_id):
"""
For a given table, find look up a sample row for it and update
the "example" fields for it in the column_metadata table.
"""
logger.info(u"task=update_sample state=start table_name=%s", table_name)
start_time = time.time()
ds = models.DataSource.get_by_id(data_source_id)
sample = None
try:
Expand All @@ -255,7 +263,7 @@ def get_table_sample_data(existing_columns, data_source_id, table_name, table_id
return

persisted_columns = ColumnMetadata.query.filter(
ColumnMetadata.name.in_(existing_columns),
ColumnMetadata.exists.is_(True),
ColumnMetadata.table_id == table_id,
).options(load_only('id')).all()

Expand All @@ -277,14 +285,56 @@ def get_table_sample_data(existing_columns, data_source_id, table_name, table_id
column_examples
)
models.db.session.commit()
logger.info(u"task=update_sample state=finished table_name=%s runtime=%.2f",
table_name, time.time() - start_time)


@celery.task(name="redash.tasks.refresh_samples")
def refresh_samples(data_source_id, table_sample_limit):
"""
For a given data source, refresh the data samples stored for each
table. This is done for tables with no samples or samples older
than DAYS_AGO
"""
logger.info(u"task=refresh_samples state=start ds_id=%s", data_source_id)
ds = models.DataSource.get_by_id(data_source_id)

if not ds.query_runner.configuration.get('samples', False):
return

DAYS_AGO = (
utils.utcnow() - datetime.timedelta(days=settings.SCHEMA_SAMPLE_REFRESH_FREQUENCY_DAYS))

# Find all existing tables that have an empty or old sample_updated_at
tables_to_sample = TableMetadata.query.filter(
TableMetadata.exists.is_(True),
or_(
TableMetadata.sample_updated_at.is_(None),
TableMetadata.sample_updated_at < DAYS_AGO
)
).limit(table_sample_limit).all()

tasks = []
for table in tables_to_sample:
tasks.append(
update_sample.signature(
args=(ds.id, table.name, table.id),
queue=settings.SCHEMAS_REFRESH_QUEUE
)
)
table.sample_updated_at = db.func.now()
models.db.session.add(table)

group(tasks).apply_async()
models.db.session.commit()


def cleanup_data_in_table(table_model):
TTL_DAYS_AGO = (
utils.utcnow() - datetime.timedelta(days=settings.SCHEMA_METADATA_TTL_DAYS))

table_model.query.filter(
table_model.exists == False,
table_model.exists.is_(False),
table_model.updated_at < TTL_DAYS_AGO
).delete()

Expand Down Expand Up @@ -420,12 +470,6 @@ def refresh_schema(data_source_id):

existing_columns_list = list(existing_columns_set)

if ds.query_runner.configuration.get('samples', False):
get_table_sample_data.apply_async(
args=(existing_columns_list, ds.id, table.name, table.id),
queue=settings.SCHEMAS_REFRESH_QUEUE
)

# If a column did not exist, set the 'column_exists' flag to false.
ColumnMetadata.query.filter(
ColumnMetadata.exists.is_(True),
Expand Down Expand Up @@ -469,6 +513,7 @@ def refresh_schemas():
"""
Refreshes the data sources schemas.
"""
TABLE_SAMPLE_LIMIT = 50
blacklist = [int(ds_id) for ds_id in redis_connection.smembers('data_sources:schema:blacklist') if ds_id]
global_start_time = time.time()

Expand All @@ -483,6 +528,7 @@ def refresh_schemas():
logger.info(u"task=refresh_schema state=skip ds_id=%s reason=org_disabled", ds.id)
else:
refresh_schema.apply_async(args=(ds.id,), queue=settings.SCHEMAS_REFRESH_QUEUE)
refresh_samples.apply_async(args=(ds.id, TABLE_SAMPLE_LIMIT), queue=settings.SAMPLES_REFRESH_QUEUE)

logger.info(u"task=refresh_schemas state=finish total_runtime=%.2f", time.time() - global_start_time)

Expand Down
104 changes: 97 additions & 7 deletions tests/tasks/test_refresh_schemas.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import copy
import datetime

from mock import patch
from tests import BaseTestCase

from redash import models
from redash.tasks import refresh_schemas, refresh_schema, get_table_sample_data
from redash import models, utils
from redash.tasks import (refresh_schemas, refresh_schema,
update_sample, refresh_samples)
from redash.models import TableMetadata, ColumnMetadata


Expand Down Expand Up @@ -43,6 +45,9 @@ def setUp(self):
self.addCleanup(get_table_sample_patcher.stop)
patched_get_table_sample.return_value = {self.COLUMN_NAME: self.COLUMN_EXAMPLE}

def tearDown(self):
self.factory.data_source.query_runner.configuration['samples'] = False

def test_calls_refresh_of_all_data_sources(self):
self.factory.data_source # trigger creation
with patch('redash.tasks.queries.refresh_schema.apply_async') as refresh_job:
Expand Down Expand Up @@ -71,12 +76,12 @@ def test_refresh_schema_creates_tables(self):
'sample_query': None,
'description': None,
'column_metadata': True,
'data_source_id': 1
'data_source_id': 1,
'sample_updated_at': None,
}

refresh_schema(self.factory.data_source.id)
get_table_sample_data(
[self.COLUMN_NAME],
update_sample(
self.factory.data_source.id,
'table',
1
Expand Down Expand Up @@ -171,8 +176,7 @@ def test_refresh_schema_update_column(self):
UPDATED_COLUMN_TYPE = 'varchar'

refresh_schema(self.factory.data_source.id)
get_table_sample_data(
[self.COLUMN_NAME],
update_sample(
self.factory.data_source.id,
'table',
1
Expand All @@ -188,3 +192,89 @@ def test_refresh_schema_update_column(self):
column_metadata = ColumnMetadata.query.all()
self.assertNotEqual(column_metadata[0].to_dict(), self.EXPECTED_COLUMN_METADATA)
self.assertEqual(column_metadata[0].to_dict()['type'], UPDATED_COLUMN_TYPE)

def test_refresh_samples_rate_limits(self):
NEW_COLUMN_NAME = 'new_column'
NUM_TABLES = 105
tables = []

for i in range(NUM_TABLES):
tables.append({
'name': 'table{}'.format(i),
'columns': [NEW_COLUMN_NAME],
'metadata': [{
'name': NEW_COLUMN_NAME,
'type': self.COLUMN_TYPE,
}]
})

self.patched_get_schema.return_value = tables
self.factory.data_source.query_runner.configuration['samples'] = True

refresh_schema(self.factory.data_source.id)
refresh_samples(self.factory.data_source.id, 50)

# There's a total of 105 tables
table_metadata = TableMetadata.query.count()
self.assertEqual(table_metadata, NUM_TABLES)

# 50 tables are processed on the first call
table_metadata = TableMetadata.query.filter(
TableMetadata.sample_updated_at.is_(None)
).all()
self.assertEqual(len(table_metadata), 55)

# 50 more tables are processed on the second call
refresh_samples(self.factory.data_source.id, 50)
table_metadata = TableMetadata.query.filter(
TableMetadata.sample_updated_at.is_(None)
).all()
self.assertEqual(len(table_metadata), 5)

# All tables are processed by the third call
refresh_samples(self.factory.data_source.id, 50)
table_metadata = TableMetadata.query.filter(
TableMetadata.sample_updated_at.is_(None)
).all()
self.assertEqual(len(table_metadata), 0)

def test_refresh_samples_refreshes(self):
NEW_COLUMN_NAME = 'new_column'
NUM_TABLES = 5
TIME_BEFORE_UPDATE = utils.utcnow()
tables = []

for i in range(NUM_TABLES):
tables.append({
'name': 'table{}'.format(i),
'columns': [NEW_COLUMN_NAME],
'metadata': [{
'name': NEW_COLUMN_NAME,
'type': self.COLUMN_TYPE,
}]
})

self.patched_get_schema.return_value = tables
self.factory.data_source.query_runner.configuration['samples'] = True

refresh_schema(self.factory.data_source.id)
refresh_samples(self.factory.data_source.id, 50)

# There's a total of 5 processed tables
table_metadata = TableMetadata.query.filter(
TableMetadata.sample_updated_at.isnot(None)
)
self.assertEqual(table_metadata.count(), NUM_TABLES)
self.assertTrue(table_metadata.first().sample_updated_at > TIME_BEFORE_UPDATE)

table_metadata.update({
'sample_updated_at': utils.utcnow() - datetime.timedelta(days=30)
})
models.db.session.commit()

TIME_BEFORE_UPDATE = utils.utcnow()
refresh_samples(self.factory.data_source.id, 50)
table_metadata_list = TableMetadata.query.filter(
TableMetadata.sample_updated_at.isnot(None)
)
self.assertTrue(table_metadata_list.first().sample_updated_at > TIME_BEFORE_UPDATE)

0 comments on commit 82546b0

Please sign in to comment.