Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added an _AccessResp and _AccessStats class to keep track of pelicanF… #70

Merged
merged 2 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 86 additions & 12 deletions src/pelicanfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import asyncio
import threading
import logging
from typing import Optional, List, Tuple, Dict

logger = logging.getLogger("fsspec.pelican")

Expand All @@ -47,6 +48,58 @@ class InvalidMetadata(PelicanException):
No Pelican metadata was found for the federation
"""

class _AccessResp:
turetske marked this conversation as resolved.
Show resolved Hide resolved
"""
A class representing a cache response.

Include's the full html request path, called access_path, which is a the cache's url
and the namespace prefix. It also includes whether the access was successful and
an error string if it exists
"""
def __init__(self, path: str, success: bool, error: Optional[str] = None):
self.access_path = path
self.success = success
self.error = error

def __repr__(self) -> str:
if self.error:
return f"{{NamespacePath: {self.access_path}, Success: {self.success}, Error: {self.error}}}"
return f"{{NamespacePath: {self.access_path}, Success: {self.success}}}"

class _AccessStats:
def __init__(self):
"""
Manage the cache access stats

For each namespace path, keep a list of the last three cache responses, including the
full cache url plus the object path, a boolean which is true if the access was
successful and false otherwise, and an optional error string if the access returned an error
"""
self.data: Dict[str, List[_AccessResp]] = {}

def add_response(self, namespace_path: str, response: _AccessResp) -> None:
"""
Add the most recent _CacheResp to for the namespace path, removing the oldest response if
there are more than three responses already
"""
if namespace_path not in self.data:
self.data[namespace_path] = []

if len(self.data[namespace_path]) >= 3:
self.data[namespace_path].pop(0)

self.data[namespace_path].append(response)

def get_responses(self, namespace_path: str) -> Tuple[List[_AccessResp], bool]:
if namespace_path in self.data:
return self.data[namespace_path], True
return [], False

def print(self) -> None:
for key, value in self.data.items():
print(f"{key}: {' '.join(map(str, value))}")


class _CacheManager(object):
"""
Manage a list of caches.
Expand Down Expand Up @@ -126,6 +179,7 @@ def __init__ (

self._namespace_cache = cachetools.TTLCache(maxsize=50, ttl=15*60)
self._namespace_lock = threading.Lock()
self._access_stats = _AccessStats()

self.token = kwargs.get('headers', {}).get('Authorization')

Expand Down Expand Up @@ -189,6 +243,9 @@ def _remove_host_from_paths(paths):

return paths

def get_access_data(self,):
return self._access_stats

async def _discover_federation_metadata(self, discUrl):
"""
Returns the json response from a GET call to the metadata discovery url of the federation
Expand Down Expand Up @@ -301,7 +358,6 @@ async def get_dirlist_url(self, fileloc: str) -> str:
"""
Returns a dirlist host url for the given namespace locations
"""

if not self.directorUrl:
metadata_json = await self._discover_federation_metadata(self.discoveryUrl)
# Ensure the director url has a '/' at the end
Expand All @@ -328,7 +384,7 @@ async def get_dirlist_url(self, fileloc: str) -> str:

def _get_prefix_info(self, path: str) -> _CacheManager:
"""
Given a path into the filesystem, return the information inthe
Given a path into the filesystem, return the information in the
namespace cache (if any)
"""
namespace_info = None
Expand All @@ -348,7 +404,7 @@ def _match_namespace(self, fileloc: str):

return namespace_info.get_url(fileloc)

def _bad_cache(self, url: str):
def _bad_cache(self, url: str, e: Exception):
"""
Given a URL of a cache transfer that failed, record
the corresponding cache as a "bad cache" in the namespace
Expand All @@ -359,6 +415,10 @@ def _bad_cache(self, url: str):
cache_url = cache_url._replace(query="", path="", fragment="")
bad_cache = cache_url.geturl()

ar = _AccessResp(url, False, str(e))
self._access_stats.add_response(path, ar)


namespace_info = self._get_prefix_info(path)
if not namespace_info:
return
Expand Down Expand Up @@ -486,8 +546,8 @@ def _io_wrapper(self, func):
def io_wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except:
self._bad_cache(self.path)
except Exception as e:
self._bad_cache(self.path, e)
raise
return io_wrapper

Expand All @@ -499,8 +559,8 @@ def _async_io_wrapper(self, func):
async def io_wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except:
self._bad_cache(self.path)
except Exception as e:
self._bad_cache(self.path, e)
raise

return io_wrapper
Expand All @@ -527,6 +587,8 @@ def open(self, path, mode, **kwargs):
data_url = sync(self.loop, self.get_origin_url if self.directReads else self.get_working_cache, path)
fp = self.httpFileSystem.open(data_url, mode, **kwargs)
fp.read = self._io_wrapper(fp.read)
ar = _AccessResp(data_url, True)
self._access_stats.add_response(path, ar)
return fp

async def open_async(self, path, **kwargs):
Expand All @@ -537,6 +599,8 @@ async def open_async(self, path, **kwargs):
data_url = self.get_working_cache(path)
fp = await self.httpFileSystem.open_async(data_url, **kwargs)
fp.read = self._async_io_wrapper(fp.read)
ar = _AccessResp(data_url, True)
self._access_stats.add_response(path, ar)
return fp

def _cache_dec(func):
Expand All @@ -557,9 +621,11 @@ async def wrapper(self, *args, **kwargs):
dataUrl = await self.get_working_cache(path)
try:
result = await func(self, dataUrl, *args[1:], **kwargs)
except:
self._bad_cache(dataUrl)
except Exception as e:
self._bad_cache(dataUrl, e)
raise
ar = _AccessResp(dataUrl, True)
self._access_stats.add_response(path, ar)
return result
return wrapper

Expand Down Expand Up @@ -591,13 +657,21 @@ async def wrapper(self, *args, **kwargs):
dataUrl.append(dUrl)
try:
result = await func(self, dataUrl, *args[1:], **kwargs)
except:
except Exception as e:
if isinstance(dataUrl, list):
for dUrl in dataUrl:
self._bad_cache(dUrl)
self._bad_cache(dUrl, e)
else:
self._bad_cache(dataUrl)
self._bad_cache(dataUrl, e)
raise
if isinstance(dataUrl, list):
for dUrl in dataUrl:
ar = _AccessResp(dUrl, True)
ns_path = self._remove_host_from_path(dUrl)
self._access_stats.add_response(ns_path, ar)
else:
ar = _AccessResp(dataUrl, True)
self._access_stats.add_response(path, ar)
return result
return wrapper

Expand Down
5 changes: 5 additions & 0 deletions test/test_director.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,11 @@ def test_open_fallback(httpserver: HTTPServer, httpserver2: HTTPServer, get_clie
with pytest.raises(NoAvailableSource):
assert pelfs.cat("/foo/bar")

response, e = pelfs.get_access_data().get_responses("/foo/bar")
assert e
assert len(response) == 3
assert response[2].success == False

def test_open_preferred(httpserver: HTTPServer, httpserver2: HTTPServer, get_client):
foo_bar_url = httpserver.url_for("/foo/bar")
httpserver.expect_request("/.well-known/pelican-configuration").respond_with_json({"director_endpoint": httpserver.url_for("/")})
Expand Down
67 changes: 67 additions & 0 deletions test/test_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""
Copyright (C) 2024, Pelican Project, Morgridge Institute for Research

Licensed under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You may
obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import aiohttp
import pytest
import pelicanfs.core
from pelicanfs.core import _AccessResp, _AccessStats, PelicanException, PelicanFileSystem, NoAvailableSource
import ssl
import trustme

from pytest_httpserver import HTTPServer


def test_response_management():
results1 = [_AccessResp("https://bad-cache/ns_path", False, PelicanException),
_AccessResp("https://good-cache/ns_path", True),
_AccessResp("https://good-cache/ns_path", True)]

results2 = [_AccessResp("https://good-cache/ns_path", True),
_AccessResp("https://good-cache/ns_path", True),
_AccessResp("https://third-cache/ns_path", False, PelicanException)]

aStats = _AccessStats()

# Add a bad response
ar_bad = _AccessResp("https://bad-cache/ns_path", False, PelicanException)
aStats.add_response("ns_path", ar_bad)

# Add a good response
ar_good = _AccessResp("https://good-cache/ns_path", True)
aStats.add_response("ns_path", ar_good)

# Add a good response
aStats.add_response("ns_path", ar_good)

# Check results
k, e = aStats.get_responses("ns_path")
assert e
assert str(k) == str(results1)

# Add another response
ar_new = _AccessResp("https://third-cache/ns_path", False, PelicanException)
aStats.add_response("ns_path", ar_new)

# Check that only the most recent three responses are available
k, e = aStats.get_responses("ns_path")
assert e
assert len(k) == 3
assert str(k) == str(results2)

# Test no responses for path
k, e = aStats.get_responses("no_path")
assert e == False

turetske marked this conversation as resolved.
Show resolved Hide resolved