Skip to content

Commit

Permalink
Merge pull request #37 from turetske/expand-endpoints
Browse files Browse the repository at this point in the history
Expand endpoints
  • Loading branch information
bbockelm authored May 11, 2024
2 parents 700b415 + 5153b7b commit 9996cc0
Show file tree
Hide file tree
Showing 5 changed files with 269 additions and 71 deletions.
42 changes: 38 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ pip install -e .

### Using PelicanFS

To use pelicanfs, first create a `PelicanFileSystem` and provide it with the url for the director of your data federation. As an example using the OSDF director
To use pelicanfs, first create a `PelicanFileSystem` and provide it with the pelican federation url. As an example using the OSDF federation

```python
from pelicanfs.core import PelicanFileSystem

pelfs = PelicanFileSystem("https://osdf-director.osg-htc.org/")
pelfs = PelicanFileSystem("pelican://osg-htc.org")
```

Once `pelfs` is pointed at your federation's director, fsspec commands can be applied to Pelican namespaces. For example:
Expand All @@ -45,7 +45,7 @@ print(hello_world)

### Getting an FSMap

Sometimes various systems that interact with an fsspec want a key-value mapper rather than a url. To do that, call the `PelicanMap` function with the namespace path and a `PelicanFileSystem` object rather than using the fsspec `get_mapper` call. For example
Sometimes various systems that interact with an fsspec want a key-value mapper rather than a url. To do that, call the `PelicanMap` function with the namespace path and a `PelicanFileSystem` object rather than using the fsspec `get_mapper` call. For example:

```python
from pelicanfs.core import PelicanFileSystem, PelicanMap
Expand All @@ -54,4 +54,38 @@ pelfs = PelicanFileSystem(“some-director-url”)
file1 = PelicanMap(“/namespace/file/1”, pelfs=pelfs)
file2 = PelicanMap(“/namespace/file/2”, pelfs=pelfs)
ds = xarray.open_mfdataset([file1,file2], engine='zarr')
```
```

### Specifying Endpoints

The following describes how to specify endpoints to get data from, rather than letting PelicanFS and the director determine the best cache. PelicanFS allows you to specify whether to read directly from the origin (bypassing data staging altogether) or to name a specific cache to stage data into.

**Note**
> If both direct reads and a specific cache are set, PelicanFS will use the specified cache and ignore the direct reads setting.

#### Enabling Direct Reads

Sometimes you might wish to read data directly from an origin rather than via a cache. To enable this at PelicanFileSystem creation, just pass in `direct_reads=True` to the constructor.

```python
pelfs = PelicanFileSystem("pelican://osg-htc.org", direct_reads=True)
```

#### Specifying a Cache

If you want to specify a specific cache to stage your data into (as opposed to the highest priority working cache), this can be done by passing in a cache URL during PelicanFileSystem construction via the `preferred_caches` variable:

```python
pelfs = PelicanFileSystem("pelican://osg-htc.org", preferred_caches=["https://cache.example.com"])
```

or

```python
pelfs = PelicanFileSystem("pelican://osg-htc.org", preferred_caches=["https://cache.example.com",
"https://cache2.example.com", "+"])
```

Note that the special cache value `"+"` indicates that the provided preferred caches should be prepended to the
list of caches from the director.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
"yarl==1.9.4",
"cachetools~=5.3"],
extras_require={
"testing": ["pytest", "pytest-httpserver"],
"testing": ["pytest", "pytest-httpserver", "trustme"],
},
project_urls={
"Source": "https://github.com/PelicanPlatform/pelicanfs",
Expand Down
187 changes: 129 additions & 58 deletions src/pelicanfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,16 @@ class PelicanException(RuntimeError):
"""
Base class for all Pelican-related failures
"""
pass

class NoAvailableSource(PelicanException):
"""
No source endpoint is currently available for the requested object
"""
pass

class InvalidMetadata(PelicanException):
"""
No Pelican metadata was found for the federation
"""

class _CacheManager(object):
"""
Expand Down Expand Up @@ -110,23 +112,26 @@ class PelicanFileSystem(AsyncFileSystem):

def __init__ (
self,
directorUrl,
federationDiscoveryUrl,
direct_reads = False,
preferred_caches = [],
asynchronous = False,
loop = None
loop = None,
**kwargs
):
super().__init__(self, asynchronous=asynchronous, loop=loop, **kwargs)

self._namespace_cache = cachetools.TTLCache(maxsize=50, ttl=15*60)
self._namespace_lock = threading.Lock()

# The internal filesystem
self.httpFileSystem = fshttp.HTTPFileSystem(asynchronous=asynchronous, loop=loop)
self.httpFileSystem = fshttp.HTTPFileSystem(asynchronous=asynchronous, loop=loop, **kwargs)

# Ensure the director url ends with a "/"
if directorUrl[-1] != "/":
directorUrl = directorUrl + "/"
self.directorUrl = directorUrl
self.discoveryUrl = federationDiscoveryUrl
self.directorUrl = ""


super().__init__(self, asynchronous=asynchronous, loop=loop)
self.directReads = direct_reads
self.preferredCaches = preferred_caches

# These are all not implemented in the http fsspec and as such are not implemented in the pelican fsspec
# They will raise NotImplementedErrors when called
Expand All @@ -143,32 +148,80 @@ def __init__ (
self._du = self.httpFileSystem._du
self._info = self.httpFileSystem._info

async def _discover_federation_metadata(self, discUrl):
"""
Returns the json response from a GET call to the metadata discovery url of the federation
"""
# Parse the url for federation discovery
discoveryUrl = urllib.parse.urlparse(discUrl)
discoveryUrl = discoveryUrl._replace(scheme="https", path="/.well-known/pelican-configuration")
session = await self.httpFileSystem.set_session()
async with session.get(discoveryUrl.geturl()) as resp:
if resp.status != 200:
raise InvalidMetadata()
return await resp.json(content_type="")

async def get_director_headers(self, fileloc):
async def get_director_headers(self, fileloc, origin=False) -> dict[str, str]:
"""
Returns the header response from a GET call to the director
"""
if fileloc[0] == "/":
fileloc = fileloc[1:]
url = self.directorUrl + fileloc

if not self.directorUrl:
metadata_json = await self._discover_federation_metadata(self.discoveryUrl)
# Ensure the director url has a '/' at the end
directorUrl = metadata_json.get('director_endpoint')
if not directorUrl:
raise InvalidMetadata()

if not directorUrl.endswith("/"):
directorUrl = directorUrl + "/"
self.directorUrl = directorUrl

if origin:
url = urllib.parse.urljoin(self.directorUrl, "/api/v1.0/director/origin/") + fileloc
else:
url = urllib.parse.urljoin(self.directorUrl, fileloc)
session = await self.httpFileSystem.set_session()
async with session.get(url, allow_redirects=False) as resp:
return resp.headers

async def get_working_cache(self, fileloc):
async def get_working_cache(self, fileloc: str) -> str:
"""
Returns the highest priority cache for the namespace that appears to be owrking
Returns the highest priority cache for the namespace that appears to be working
"""
cacheUrl = self._match_namespace(fileloc)
if cacheUrl:
return cacheUrl

headers = await self.get_director_headers(fileloc)
metalist, namespace = parse_metalink(headers)
goodEntry = False
# Calculate the list of applicable caches; this takes into account the
# preferredCaches for the filesystem. If '+' is a preferred cache, we
# add all the director-provided caches to the list (doing a round of de-dup)
cache_list = []
while metalist:
updatedUrl = metalist[0][0]
if self.preferredCaches:
cache_list = [urllib.parse.urljoin(cache, fileloc) if cache != "+" else "+" for cache in self.preferredCaches]
namespace = "/"
if not self.preferredCaches or ("+" in self.preferredCaches):
headers = await self.get_director_headers(fileloc)
metalist, namespace = parse_metalink(headers)
old_cache_list = cache_list
cache_list = []
cache_set = set()
new_caches = [entry[0] for entry in metalist]
for cache in old_cache_list:
if cache == "+":
for cache2 in new_caches:
if cache2 not in cache_set:
cache_set.add(cache2)
cache_list.append(cache2)
else:
cache_list.append(cache)
if not cache_list:
cache_list = new_caches

while cache_list:
updatedUrl = cache_list[0]
# Timeout response in seconds - the default response is 5 minutes
timeout = aiohttp.ClientTimeout(total=5)
session = await self.httpFileSystem.set_session()
Expand All @@ -178,15 +231,27 @@ async def get_working_cache(self, fileloc):
break
except (aiohttp.client_exceptions.ClientConnectorError, FileNotFoundError, asyncio.TimeoutError, asyncio.exceptions.TimeoutError):
pass
metalist = metalist[1:]
if not metalist:
cache_list = cache_list[1:]

if not cache_list:
# No working cache was found
raise NoAvailableSource()

with self._namespace_lock:
self._namespace_cache[namespace] = _CacheManager([i[0] for i in metalist])
self._namespace_cache[namespace] = _CacheManager(cache_list)

return updatedUrl

async def get_origin_url(self, fileloc: str) -> str:
"""
Returns an origin url for the given namespace location
"""
headers = await self.get_director_headers(fileloc, origin=True)
origin = headers.get("Location")
if not origin:
raise NoAvailableSource()
return origin

def _get_prefix_info(self, path: str) -> _CacheManager:
"""
Given a path into the filesystem, return the information inthe
Expand Down Expand Up @@ -235,11 +300,7 @@ def _dirlist_dec(func):
async def wrapper(self, *args, **kwargs):
path = args[0]
parsedUrl = urllib.parse.urlparse(path)
headers = await self.get_director_headers(parsedUrl.path)
dirlistloc = get_dirlist_loc(headers)
if dirlistloc == None:
raise RuntimeError
listUrl = dirlistloc + "/" + parsedUrl.path
listUrl = await self.get_origin_url(parsedUrl.path)
result = await func(self, listUrl, *args[1:], **kwargs)
return result
return wrapper
Expand All @@ -259,11 +320,7 @@ async def _find(self, path, maxdepth=None, withdirs=False, **kwargs):
# Not using a decorator because it requires a yield
async def _walk(self, path, maxdepth=None, on_error="omit", **kwargs):
parsedUrl = urllib.parse.urlparse(path)
headers = await self.get_director_headers(parsedUrl.path)
dirlistloc = get_dirlist_loc(headers)
if dirlistloc == "":
raise RuntimeError
listUrl = dirlistloc + "/" + path
listUrl = await self.get_origin_url(parsedUrl.path)
async for _ in self.httpFileSystem._walk(listUrl, maxdepth, on_error, **kwargs):
yield _

Expand All @@ -278,7 +335,6 @@ def io_wrapper(*args, **kwargs):
except:
self._bad_cache(self.path)
raise

return io_wrapper

def _async_io_wrapper(self, func):
Expand All @@ -296,15 +352,19 @@ async def io_wrapper(*args, **kwargs):
return io_wrapper

def open(self, path, **kwargs):
cache_url = sync(self.loop, self.get_working_cache, path)
fp = self.httpFileSystem.open(cache_url, **kwargs)
data_url = sync(self.loop, self.get_origin_cache if self.directReads else self.get_working_cache, path)
fp = self.httpFileSystem.open(data_url, **kwargs)
fp.read = self._io_wrapper(fp.read)
return fp

async def open_async(self, path, **kwargs):
cache_url = sync(self.loop, self.get_working_cache, path)
fp = await self.httpFileSystem.open_async(cache_url, **kwargs)
if self.directReads:
data_url = await self.get_origin_cache(path)
else:
data_url = self.get_working_cache(path)
fp = await self.httpFileSystem.open_async(data_url, **kwargs)
fp.read = self._async_io_wrapper(fp.read)
return fp

def _cache_dec(func):
"""
Expand All @@ -319,14 +379,17 @@ def _cache_dec(func):
async def wrapper(self, *args, **kwargs):
path = args[0]
parsedUrl = urllib.parse.urlparse(path)
if parsedUrl.scheme == "http":
cacheUrl = path
if parsedUrl.scheme == "http" or parsedUrl.scheme == "https":
dataUrl = path
else:
cacheUrl = await self.get_working_cache(parsedUrl.path)
if self.directReads:
dataUrl = await self.get_origin_url(parsedUrl.path)
else:
dataUrl = await self.get_working_cache(parsedUrl.path)
try:
result = await func(self, cacheUrl, *args[1:], **kwargs)
result = await func(self, dataUrl, *args[1:], **kwargs)
except:
self._bad_cache(cacheUrl)
self._bad_cache(dataUrl)
raise
return result
return wrapper
Expand All @@ -344,23 +407,33 @@ async def wrapper(self, *args, **kwargs):
path = args[0]
if isinstance(path, str):
parsedUrl = urllib.parse.urlparse(path)
if parsedUrl.scheme == "http":
cacheUrl = path
if parsedUrl.scheme == "http" or parsedUrl.scheme == "https":
dataUrl = path
else:
cacheUrl = await self.get_working_cache(parsedUrl.path)
if self.directReads:
dataUrl = await self.get_origin_url(parsedUrl.path)
else:
dataUrl = await self.get_working_cache(parsedUrl.path)
else:
cacheUrl = []
dataUrl = []
for p in path:
parsedUrl = urllib.parse.urlparse(p)
if parsedUrl.scheme == "http":
cUrl = p
if parsedUrl.scheme == "http" or parsedUrl.scheme == "https":
dUrl = p
else:
cUrl = cacheUrl = await self.get_working_cache(parsedUrl.path)
cacheUrl.append(cUrl)
if self.directReads:
dUrl = await self.get_origin_url(parsedUrl.path)
else:
dUrl = await self.get_working_cache(parsedUrl.path)
dataUrl.append(dUrl)
try:
result = await func(self, cacheUrl, *args[1:], **kwargs)
result = await func(self, dataUrl, *args[1:], **kwargs)
except:
self._bad_cache(cacheUrl)
if isinstance(dataUrl, list):
for dUrl in dataUrl:
self._bad_cache(dUrl)
else:
self._bad_cache(dataUrl)
raise
return result
return wrapper
Expand Down Expand Up @@ -402,10 +475,8 @@ class OSDFFileSystem(PelicanFileSystem):
protocol = "osdf"

def __init__(self, **kwargs):
# TODO: Once the base class takes `pelican://` URLs, switch to
# `pelican://osg-htc.org`
super().__init__("https://osdf-director.osg-htc.org", **kwargs)
super().__init__("pelican://osg-htc.org", **kwargs)

def PelicanMap(root, pelfs: PelicanFileSystem, check=False, create=False):
cache_url = sync(pelfs.loop, pelfs.get_working_cache, root)
return pelfs.get_mapper(cache_url, check=check, create=create)
dataUrl = sync(pelfs.loop, pelfs.get_origin_url if pelfs.directReads else pelfs.get_working_cache, root)
return pelfs.get_mapper(dataUrl, check=check, create=create)
Loading

0 comments on commit 9996cc0

Please sign in to comment.