Skip to content

Commit

Permalink
chore: log cache keys to the logs (apache#10678)
Browse files Browse the repository at this point in the history
* Log cache keys to the logs

* Add tests

* Use separate table for the cache keys

* Add migration for the cache lookup table

Co-authored-by: bogdan kyryliuk <bogdankyryliuk@dropbox.com>
  • Loading branch information
2 people authored and auxten committed Nov 20, 2020
1 parent eb2d096 commit 8a96bd0
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 35 deletions.
19 changes: 9 additions & 10 deletions superset/common/query_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from superset.stats_logger import BaseStatsLogger
from superset.utils import core as utils
from superset.utils.core import DTTM_ALIAS
from superset.viz import set_and_log_cache

config = app.config
stats_logger: BaseStatsLogger = config["STATS_LOGGER"]
Expand Down Expand Up @@ -272,16 +273,14 @@ def get_df_payload( # pylint: disable=too-many-locals,too-many-statements
stacktrace = utils.get_stacktrace()

if is_loaded and cache_key and cache and status != utils.QueryStatus.FAILED:
try:
cache_value = dict(dttm=cached_dttm, df=df, query=query)
stats_logger.incr("set_cache_key")
cache.set(cache_key, cache_value, timeout=self.cache_timeout)
except Exception as ex: # pylint: disable=broad-except
# cache.set call can fail if the backend is down or if
# the key is too large or whatever other reasons
logger.warning("Could not cache key %s", cache_key)
logger.exception(ex)
cache.delete(cache_key)
set_and_log_cache(
cache_key,
df,
query,
cached_dttm,
self.cache_timeout,
self.datasource.uid,
)
return {
"cache_key": cache_key,
"cached_dttm": cache_value["dttm"] if cache_value is not None else None,
Expand Down
53 changes: 53 additions & 0 deletions superset/migrations/versions/175ea3592453_cache_lookup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Add cache to datasource lookup table.
Revision ID: 175ea3592453
Revises: f80a3b88324b
Create Date: 2020-08-28 17:16:57.379425
"""

# revision identifiers, used by Alembic.
revision = "175ea3592453"
down_revision = "f80a3b88324b"

import sqlalchemy as sa
from alembic import op


def upgrade():
op.create_table(
"cache_keys",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("cache_key", sa.String(256), nullable=False),
sa.Column("cache_timeout", sa.Integer(), nullable=True),
sa.Column("datasource_uid", sa.String(64), nullable=False),
sa.Column("created_on", sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
op.f("ix_cache_keys_datasource_uid"),
"cache_keys",
["datasource_uid"],
unique=False,
)


def downgrade():
op.drop_index(op.f("ix_cache_keys_datasource_uid"), table_name="cache_keys")
op.drop_table("cache_keys")
32 changes: 32 additions & 0 deletions superset/models/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from datetime import datetime

from flask_appbuilder import Model
from sqlalchemy import Column, DateTime, Integer, String


class CacheKey(Model): # pylint: disable=too-few-public-methods

"""Stores cache key records for the superset visualization."""

__tablename__ = "cache_keys"
id = Column(Integer, primary_key=True)
cache_key = Column(String(256), nullable=False)
cache_timeout = Column(Integer, nullable=True)
datasource_uid = Column(String(64), nullable=False, index=True)
created_on = Column(DateTime, default=datetime.now, nullable=True)
1 change: 0 additions & 1 deletion superset/utils/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ def log( # pylint: disable=too-many-locals
user_id=user_id,
)
logs.append(log)

try:
sesh = current_app.appbuilder.get_session
sesh.bulk_save_objects(logs)
Expand Down
49 changes: 38 additions & 11 deletions superset/viz.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,15 @@
from geopy.point import Point
from pandas.tseries.frequencies import to_offset

from superset import app, cache, security_manager
from superset import app, cache, db, security_manager
from superset.constants import NULL_STRING
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
from superset.exceptions import (
NullValueException,
QueryObjectValidationError,
SpatialException,
)
from superset.models.cache import CacheKey
from superset.models.helpers import QueryResult
from superset.typing import QueryObjectDict, VizData, VizPayload
from superset.utils import core as utils
Expand Down Expand Up @@ -95,6 +96,34 @@
]


def set_and_log_cache(
cache_key: str,
df: pd.DataFrame,
query: str,
cached_dttm: str,
cache_timeout: int,
datasource_uid: Optional[str],
) -> None:
try:
cache_value = dict(dttm=cached_dttm, df=df, query=query)
stats_logger.incr("set_cache_key")
cache.set(cache_key, cache_value, timeout=cache_timeout)

if datasource_uid:
ck = CacheKey(
cache_key=cache_key,
cache_timeout=cache_timeout,
datasource_uid=datasource_uid,
)
db.session.add(ck)
except Exception as ex:
# cache.set call can fail if the backend is down or if
# the key is too large or whatever other reasons
logger.warning("Could not cache key {}".format(cache_key))
logger.exception(ex)
cache.delete(cache_key)


class BaseViz:

"""All visualizations derive this base class"""
Expand Down Expand Up @@ -536,16 +565,14 @@ def get_df_payload(
and cache
and self.status != utils.QueryStatus.FAILED
):
try:
cache_value = dict(dttm=cached_dttm, df=df, query=self.query)
stats_logger.incr("set_cache_key")
cache.set(cache_key, cache_value, timeout=self.cache_timeout)
except Exception as ex:
# cache.set call can fail if the backend is down or if
# the key is too large or whatever other reasons
logger.warning("Could not cache key {}".format(cache_key))
logger.exception(ex)
cache.delete(cache_key)
set_and_log_cache(
cache_key,
df,
self.query,
cached_dttm,
self.cache_timeout,
self.datasource.uid,
)
return {
"cache_key": self._any_cache_key,
"cached_dttm": self._any_cached_dttm,
Expand Down
22 changes: 10 additions & 12 deletions superset/viz_sip38.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,9 @@
from flask import request
from flask_babel import lazy_gettext as _
from geopy.point import Point
from markdown import markdown
from pandas.tseries.frequencies import to_offset

from superset import app, cache, get_manifest_files, security_manager
from superset import app, cache, security_manager
from superset.constants import NULL_STRING
from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
from superset.exceptions import (
Expand All @@ -63,6 +62,7 @@
merge_extra_filters,
to_adhoc,
)
from superset.viz import set_and_log_cache

if TYPE_CHECKING:
from superset.connectors.base.models import BaseDatasource
Expand Down Expand Up @@ -521,16 +521,14 @@ def get_df_payload(
and cache
and self.status != utils.QueryStatus.FAILED
):
try:
cache_value = dict(dttm=cached_dttm, df=df, query=self.query)
stats_logger.incr("set_cache_key")
cache.set(cache_key, cache_value, timeout=self.cache_timeout)
except Exception as ex:
# cache.set call can fail if the backend is down or if
# the key is too large or whatever other reasons
logger.warning("Could not cache key {}".format(cache_key))
logger.exception(ex)
cache.delete(cache_key)
set_and_log_cache(
cache_key,
df,
self.query,
cached_dttm,
self.cache_timeout,
self.datasource.uid,
)

return {
"cache_key": self._any_cache_key,
Expand Down
7 changes: 7 additions & 0 deletions tests/core_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import pandas as pd
import sqlalchemy as sqla

from superset.models.cache import CacheKey
from tests.test_app import app # isort:skip
import superset.views.utils
from superset import (
Expand Down Expand Up @@ -583,6 +584,12 @@ def test_warm_up_cache(self):
+ quote(json.dumps([{"col": "name", "op": "in", "val": ["Jennifer"]}]))
) == [{"slice_id": slc.id, "viz_error": None, "viz_status": "success"}]

def test_cache_logging(self):
slc = self.get_slice("Girls", db.session)
self.get_json_resp("/superset/warm_up_cache?slice_id={}".format(slc.id))
ck = db.session.query(CacheKey).order_by(CacheKey.id.desc()).first()
assert ck.datasource_uid == "3__table"

def test_shortner(self):
self.login(username="admin")
data = (
Expand Down
5 changes: 4 additions & 1 deletion tests/query_context_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from superset import db
from superset.charts.schemas import ChartDataQueryContextSchema
from superset.connectors.connector_registry import ConnectorRegistry
from superset.models.cache import CacheKey
from superset.utils.core import (
AdhocMetricExpressionType,
ChartDataResultFormat,
Expand Down Expand Up @@ -141,7 +142,6 @@ def test_query_context_time_range_endpoints(self):
query_object = query_context.queries[0]
extras = query_object.to_dict()["extras"]
self.assertTrue("time_range_endpoints" in extras)

self.assertEqual(
extras["time_range_endpoints"],
(TimeRangeEndpoint.INCLUSIVE, TimeRangeEndpoint.EXCLUSIVE),
Expand Down Expand Up @@ -180,6 +180,9 @@ def test_csv_response_format(self):
self.assertIn("name,sum__num\n", data)
self.assertEqual(len(data.split("\n")), 12)

ck = db.session.query(CacheKey).order_by(CacheKey.id.desc()).first()
assert ck.datasource_uid == "3__table"

def test_sql_injection_via_groupby(self):
"""
Ensure that calling invalid columns names in groupby are caught
Expand Down

0 comments on commit 8a96bd0

Please sign in to comment.