Skip to content

Commit

Permalink
Fix Workflow failure cases
Browse files Browse the repository at this point in the history
  • Loading branch information
Sivarajan Narayanan committed Aug 10, 2024
1 parent cc3cbd9 commit fa85476
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 38 deletions.
117 changes: 85 additions & 32 deletions superset/async_events/cache_backend.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,30 @@
from typing import Optional, Dict, List, Any, Tuple
from flask_caching.backends.rediscache import RedisCache, RedisSentinelCache
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Any, Dict, List, Optional, Tuple

import redis
from flask_caching.backends.rediscache import RedisCache, RedisSentinelCache
from redis.sentinel import Sentinel


class RedisCacheBackend(RedisCache):
MAX_EVENT_COUNT = 100

def __init__(
def __init__( # pylint: disable=too-many-arguments
self,
host: str,
port: int,
Expand All @@ -17,9 +35,9 @@ def __init__(
ssl: bool = False,
ssl_certfile: Optional[str] = None,
ssl_keyfile: Optional[str] = None,
ssl_cert_reqs: str = 'required',
ssl_cert_reqs: str = "required",
ssl_ca_certs: Optional[str] = None,
**kwargs: Any
**kwargs: Any,
) -> None:
super().__init__(
host=host,
Expand All @@ -28,7 +46,7 @@ def __init__(
db=db,
default_timeout=default_timeout,
key_prefix=key_prefix,
**kwargs
**kwargs,
)
self._cache = redis.Redis(
host=host,
Expand All @@ -40,18 +58,30 @@ def __init__(
ssl_keyfile=ssl_keyfile,
ssl_cert_reqs=ssl_cert_reqs,
ssl_ca_certs=ssl_ca_certs,
**kwargs
**kwargs,
)

def xadd(self, stream_name: str, event_data: Dict[str, Any], event_id: str = "*", maxlen: Optional[int] = None) -> str:
def xadd(
self,
stream_name: str,
event_data: Dict[str, Any],
event_id: str = "*",
maxlen: Optional[int] = None,
) -> str:
return self._cache.xadd(stream_name, event_data, event_id, maxlen)

Check warning on line 71 in superset/async_events/cache_backend.py

View check run for this annotation

Codecov / codecov/patch

superset/async_events/cache_backend.py#L71

Added line #L71 was not covered by tests

def xrange(self, stream_name: str, start: str = "-", end: str = "+", count: Optional[int] = None) -> List[Any]:
def xrange(
self,
stream_name: str,
start: str = "-",
end: str = "+",
count: Optional[int] = None,
) -> List[Any]:
count = count or self.MAX_EVENT_COUNT
return self._cache.xrange(stream_name, start, end, count)

Check warning on line 81 in superset/async_events/cache_backend.py

View check run for this annotation

Codecov / codecov/patch

superset/async_events/cache_backend.py#L80-L81

Added lines #L80 - L81 were not covered by tests

@classmethod
def from_config(cls, config: Dict[str, Any]) -> 'RedisCacheBackend':
def from_config(cls, config: Dict[str, Any]) -> "RedisCacheBackend":
kwargs = {
"host": config.get("CACHE_REDIS_HOST", "localhost"),
"port": config.get("CACHE_REDIS_PORT", 6379),
Expand All @@ -62,15 +92,16 @@ def from_config(cls, config: Dict[str, Any]) -> 'RedisCacheBackend':
"ssl": config.get("CACHE_REDIS_SSL", False),
"ssl_certfile": config.get("CACHE_REDIS_SSL_CERTFILE", None),
"ssl_keyfile": config.get("CACHE_REDIS_SSL_KEYFILE", None),
"ssl_cert_reqs": config.get("CACHE_REDIS_SSL_CERT_REQS", 'required'),
"ssl_cert_reqs": config.get("CACHE_REDIS_SSL_CERT_REQS", "required"),
"ssl_ca_certs": config.get("CACHE_REDIS_SSL_CA_CERTS", None),
}
return cls(**kwargs)


class RedisSentinelCacheBackend(RedisSentinelCache):
MAX_EVENT_COUNT = 100

def __init__(
def __init__( # pylint: disable=too-many-arguments
self,
sentinels: List[Tuple[str, int]],
master: str,
Expand All @@ -82,43 +113,53 @@ def __init__(
ssl: bool = False,
ssl_certfile: Optional[str] = None,
ssl_keyfile: Optional[str] = None,
ssl_cert_reqs: str = 'required',
ssl_cert_reqs: str = "required",
ssl_ca_certs: Optional[str] = None,
**kwargs: Any
**kwargs: Any,
) -> None:
# Sentinel dont directly support SSL
# Initialize Sentinel without SSL parameters
self._sentinel = Sentinel(

Check warning on line 122 in superset/async_events/cache_backend.py

View check run for this annotation

Codecov / codecov/patch

superset/async_events/cache_backend.py#L122

Added line #L122 was not covered by tests
sentinels,
sentinel_kwargs={
'password': sentinel_password,
"password": sentinel_password,
},
**{
k: v
for k, v in kwargs.items()
if k
not in [
"ssl",
"ssl_certfile",
"ssl_keyfile",
"ssl_cert_reqs",
"ssl_ca_certs",
]
},
**{k: v for k, v in kwargs.items() if k not in ['ssl', 'ssl_certfile', 'ssl_keyfile', 'ssl_cert_reqs', 'ssl_ca_certs']}
)

# Prepare SSL-related arguments for master_for method
master_kwargs = {

Check warning on line 142 in superset/async_events/cache_backend.py

View check run for this annotation

Codecov / codecov/patch

superset/async_events/cache_backend.py#L142

Added line #L142 was not covered by tests
'password': password,
'ssl': ssl,
'ssl_certfile': ssl_certfile if ssl else None,
'ssl_keyfile': ssl_keyfile if ssl else None,
'ssl_cert_reqs': ssl_cert_reqs if ssl else None,
'ssl_ca_certs': ssl_ca_certs if ssl else None,
"password": password,
"ssl": ssl,
"ssl_certfile": ssl_certfile if ssl else None,
"ssl_keyfile": ssl_keyfile if ssl else None,
"ssl_cert_reqs": ssl_cert_reqs if ssl else None,
"ssl_ca_certs": ssl_ca_certs if ssl else None,
}

# If SSL is False, remove all SSL-related keys
# SSL_* are expected only if SSL is True
if not ssl:
master_kwargs = {k: v for k, v in master_kwargs.items() if not k.startswith('ssl')}
master_kwargs = {

Check warning on line 154 in superset/async_events/cache_backend.py

View check run for this annotation

Codecov / codecov/patch

superset/async_events/cache_backend.py#L153-L154

Added lines #L153 - L154 were not covered by tests
k: v for k, v in master_kwargs.items() if not k.startswith("ssl")
}

# Filter out None values from master_kwargs
master_kwargs = {k: v for k, v in master_kwargs.items() if v is not None}

Check warning on line 159 in superset/async_events/cache_backend.py

View check run for this annotation

Codecov / codecov/patch

superset/async_events/cache_backend.py#L159

Added line #L159 was not covered by tests

# Initialize Redis master connection
self._cache = self._sentinel.master_for(
master,
**master_kwargs
)
self._cache = self._sentinel.master_for(master, **master_kwargs)

Check warning on line 162 in superset/async_events/cache_backend.py

View check run for this annotation

Codecov / codecov/patch

superset/async_events/cache_backend.py#L162

Added line #L162 was not covered by tests

# Call the parent class constructor
super().__init__(

Check warning on line 165 in superset/async_events/cache_backend.py

View check run for this annotation

Codecov / codecov/patch

superset/async_events/cache_backend.py#L165

Added line #L165 was not covered by tests
Expand All @@ -128,18 +169,30 @@ def __init__(
db=db,
default_timeout=default_timeout,
key_prefix=key_prefix,
**kwargs
**kwargs,
)

def xadd(self, stream_name: str, event_data: Dict[str, Any], event_id: str = "*", maxlen: Optional[int] = None) -> str:
def xadd(
self,
stream_name: str,
event_data: Dict[str, Any],
event_id: str = "*",
maxlen: Optional[int] = None,
) -> str:
return self._cache.xadd(stream_name, event_data, event_id, maxlen)

Check warning on line 182 in superset/async_events/cache_backend.py

View check run for this annotation

Codecov / codecov/patch

superset/async_events/cache_backend.py#L182

Added line #L182 was not covered by tests

def xrange(self, stream_name: str, start: str = "-", end: str = "+", count: Optional[int] = None) -> List[Any]:
def xrange(
self,
stream_name: str,
start: str = "-",
end: str = "+",
count: Optional[int] = None,
) -> List[Any]:
count = count or self.MAX_EVENT_COUNT
return self._cache.xrange(stream_name, start, end, count)

Check warning on line 192 in superset/async_events/cache_backend.py

View check run for this annotation

Codecov / codecov/patch

superset/async_events/cache_backend.py#L191-L192

Added lines #L191 - L192 were not covered by tests

@classmethod
def from_config(cls, config: Dict[str, Any]) -> 'RedisSentinelCacheBackend':
def from_config(cls, config: Dict[str, Any]) -> "RedisSentinelCacheBackend":
kwargs = {

Check warning on line 196 in superset/async_events/cache_backend.py

View check run for this annotation

Codecov / codecov/patch

superset/async_events/cache_backend.py#L196

Added line #L196 was not covered by tests
"sentinels": config.get("CACHE_REDIS_SENTINELS", [("127.0.0.1", 26379)]),
"master": config.get("CACHE_REDIS_SENTINEL_MASTER", "mymaster"),
Expand All @@ -150,7 +203,7 @@ def from_config(cls, config: Dict[str, Any]) -> 'RedisSentinelCacheBackend':
"ssl": config.get("CACHE_REDIS_SSL", False),
"ssl_certfile": config.get("CACHE_REDIS_SSL_CERTFILE", None),
"ssl_keyfile": config.get("CACHE_REDIS_SSL_KEYFILE", None),
"ssl_cert_reqs": config.get("CACHE_REDIS_SSL_CERT_REQS", 'required'),
"ssl_cert_reqs": config.get("CACHE_REDIS_SSL_CERT_REQS", "required"),
"ssl_ca_certs": config.get("CACHE_REDIS_SSL_CA_CERTS", None),
}
return cls(**kwargs)

Check warning on line 209 in superset/async_events/cache_backend.py

View check run for this annotation

Codecov / codecov/patch

superset/async_events/cache_backend.py#L209

Added line #L209 was not covered by tests
4 changes: 2 additions & 2 deletions tests/integration_tests/async_events/api_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Optional, Type
from typing import Any, Optional, Type
from unittest import mock

import redis
Expand All @@ -38,7 +38,7 @@ def fetch_events(self, last_id: Optional[str] = None):
uri = f"{base_uri}?last_id={last_id}" if last_id else base_uri
return self.client.get(uri)

def run_test_with_cache_backend(self, cache_backend_cls: Type, test_func):
def run_test_with_cache_backend(self, cache_backend_cls: Type[Any], test_func):
app._got_first_request = False
async_query_manager.init_app(app)

Expand Down
13 changes: 9 additions & 4 deletions tests/integration_tests/tasks/async_queries_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ class TestAsyncQueries(SupersetTestCase):
@pytest.mark.usefixtures(
"load_birth_names_data", "load_birth_names_dashboard_with_slices"
)
def test_load_chart_data_into_cache(self):
@mock.patch.object(async_query_manager, "update_job")
@mock.patch("superset.tasks.async_queries.set_form_data")
def test_load_chart_data_into_cache(self, mock_set_form_data, mock_update_job):
from superset.tasks.async_queries import load_chart_data_into_cache

app._got_first_request = False
Expand All @@ -77,10 +79,13 @@ def test_load_chart_data_into_cache(self):

load_chart_data_into_cache(job_metadata, query_context)

async_query_manager.update_job.assert_called_once_with(
mock_set_form_data.assert_called_once_with(query_context)

mock_update_job.assert_called_once_with(
job_metadata, "done", result_url=mock.ANY
)
async_query_manager.update_job.reset_mock()
mock_set_form_data.reset_mock()
mock_update_job.reset_mock()

@mock.patch.object(
ChartDataCommand, "run", side_effect=ChartDataQueryFailedError("Error: foo")
Expand Down Expand Up @@ -189,7 +194,7 @@ def test_load_explore_json_into_cache(self, mock_update_job):

load_explore_json_into_cache(job_metadata, form_data)

async_query_manager.update_job.assert_called_once_with(
mock_update_job.assert_called_once_with(
job_metadata, "done", result_url=mock.ANY
)
mock_update_job.reset_mock()
Expand Down

0 comments on commit fa85476

Please sign in to comment.