Skip to content

Commit

Permalink
Added preliminary authorization
Browse files Browse the repository at this point in the history
	-- Can pass authorization as headers or as a query string
	-- Added basic pytests for auth headers/query strings
	-- Removed extraneous imports
  • Loading branch information
turetske committed May 21, 2024
1 parent 17ada18 commit 8079078
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 7 deletions.
17 changes: 10 additions & 7 deletions src/pelicanfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@
"""

import cachetools
import fsspec
import fsspec.registry
from fsspec.asyn import AsyncFileSystem, sync
from fsspec.spec import AbstractBufferedFile
from .dir_header_parser import parse_metalink, get_dirlist_loc
from .dir_header_parser import parse_metalink
import fsspec.implementations.http as fshttp
import aiohttp
import urllib.parse
Expand Down Expand Up @@ -124,6 +121,8 @@ def __init__ (
self._namespace_cache = cachetools.TTLCache(maxsize=50, ttl=15*60)
self._namespace_lock = threading.Lock()

self.token = kwargs.get('headers', {}).get('Authorization')

# The internal filesystem
self.httpFileSystem = fshttp.HTTPFileSystem(asynchronous=asynchronous, loop=loop, **kwargs)

Expand Down Expand Up @@ -191,7 +190,9 @@ async def get_working_cache(self, fileloc: str) -> str:
"""
Returns the highest priority cache for the namespace that appears to be working
"""
cacheUrl = self._match_namespace(fileloc)
fparsed = urllib.parse.urlparse(fileloc)
# Removing the query if need be
cacheUrl = self._match_namespace(fparsed.path)
if cacheUrl:
return cacheUrl

Expand All @@ -200,15 +201,15 @@ async def get_working_cache(self, fileloc: str) -> str:
# add all the director-provided caches to the list (doing a round of de-dup)
cache_list = []
if self.preferredCaches:
cache_list = [urllib.parse.urljoin(cache, fileloc) if cache != "+" else "+" for cache in self.preferredCaches]
cache_list = [urllib.parse.urlparse(urllib.parse.urljoin(cache, fileloc))._replace(query=fparsed.query).geturl() if cache != "+" else "+" for cache in self.preferredCaches]
namespace = "/"
if not self.preferredCaches or ("+" in self.preferredCaches):
headers = await self.get_director_headers(fileloc)
metalist, namespace = parse_metalink(headers)
old_cache_list = cache_list
cache_list = []
cache_set = set()
new_caches = [entry[0] for entry in metalist]
new_caches = [urllib.parse.urlparse(entry[0])._replace(query=fparsed.query).geturl() for entry in metalist]
for cache in old_cache_list:
if cache == "+":
for cache2 in new_caches:
Expand All @@ -225,6 +226,8 @@ async def get_working_cache(self, fileloc: str) -> str:
# Timeout response in seconds - the default response is 5 minutes
timeout = aiohttp.ClientTimeout(total=5)
session = await self.httpFileSystem.set_session()
if self.token:
session.headers["Authorization"] = self.token
try:
async with session.head(updatedUrl, timeout=timeout) as resp:
if resp.status >= 200 and resp.status < 400:
Expand Down
50 changes: 50 additions & 0 deletions test/test_director.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,53 @@ def test_open_mapper(httpserver: HTTPServer, get_client):

pelMap = pelicanfs.core.PelicanMap("/foo", pelfs=pelfs)
assert pelMap['bar'] == b'hello, world!'

def test_authorization_headers(httpserver: HTTPServer, get_client):
foo_bar_url = httpserver.url_for("/foo/bar")
test_headers_with_bearer = {"Authorization": "Bearer test"}

httpserver.expect_request("/.well-known/pelican-configuration").respond_with_json({"director_endpoint": httpserver.url_for("/")})
httpserver.expect_oneshot_request("/foo/bar", method="GET").respond_with_data(
"",
status=307,
headers={"Link": f'<{foo_bar_url}>; rel="duplicate"; pri=1; depth=1',
"Location": foo_bar_url,
"X-Pelican-Namespace": "namespace=/foo"
},
)

httpserver.expect_request("/foo/bar", headers=test_headers_with_bearer, method="HEAD").respond_with_data("hello, world!")
httpserver.expect_request("/foo/bar", headers=test_headers_with_bearer, method="GET").respond_with_data("hello, world!")

pelfs = pelicanfs.core.PelicanFileSystem(
httpserver.url_for("/"),
get_client=get_client,
skip_instance_cache=True,
headers = test_headers_with_bearer
)

assert pelfs.cat("/foo/bar", headers={'Authorization': 'Bearer test'}) == b"hello, world!"

def test_authz_query(httpserver: HTTPServer, get_client):
foo_bar_url = httpserver.url_for("/foo/bar")

httpserver.expect_request("/.well-known/pelican-configuration").respond_with_json({"director_endpoint": httpserver.url_for("/")})
httpserver.expect_oneshot_request("/foo/bar", method="GET").respond_with_data(
"",
status=307,
headers={"Link": f'<{foo_bar_url}>; rel="duplicate"; pri=1; depth=1',
"Location": foo_bar_url,
"X-Pelican-Namespace": "namespace=/foo"
},
)

httpserver.expect_request("/foo/bar", query_string="authz=test", method="HEAD").respond_with_data("hello, world!")
httpserver.expect_request("/foo/bar", query_string="authz=test", method="GET").respond_with_data("hello, world!")

pelfs = pelicanfs.core.PelicanFileSystem(
httpserver.url_for("/"),
get_client=get_client,
skip_instance_cache=True,
)

assert pelfs.cat("/foo/bar?authz=test") == b"hello, world!"

0 comments on commit 8079078

Please sign in to comment.