From 8079078c8fe926f74b5b5e3126193a1a220e2bc7 Mon Sep 17 00:00:00 2001 From: Emma Turetsky Date: Tue, 14 May 2024 16:46:05 -0500 Subject: [PATCH] Added preliminary authorization -- Can pass authorization as headers or as a query string -- Added basic pytests for auth headers/query strings -- Removed extraneous imports --- src/pelicanfs/core.py | 17 +++++++++------ test/test_director.py | 50 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 7 deletions(-) diff --git a/src/pelicanfs/core.py b/src/pelicanfs/core.py index 0b71faf..b0ff900 100644 --- a/src/pelicanfs/core.py +++ b/src/pelicanfs/core.py @@ -15,11 +15,8 @@ """ import cachetools -import fsspec -import fsspec.registry from fsspec.asyn import AsyncFileSystem, sync -from fsspec.spec import AbstractBufferedFile -from .dir_header_parser import parse_metalink, get_dirlist_loc +from .dir_header_parser import parse_metalink import fsspec.implementations.http as fshttp import aiohttp import urllib.parse @@ -124,6 +121,8 @@ def __init__ ( self._namespace_cache = cachetools.TTLCache(maxsize=50, ttl=15*60) self._namespace_lock = threading.Lock() + self.token = kwargs.get('headers', {}).get('Authorization') + # The internal filesystem self.httpFileSystem = fshttp.HTTPFileSystem(asynchronous=asynchronous, loop=loop, **kwargs) @@ -191,7 +190,9 @@ async def get_working_cache(self, fileloc: str) -> str: """ Returns the highest priority cache for the namespace that appears to be working """ - cacheUrl = self._match_namespace(fileloc) + fparsed = urllib.parse.urlparse(fileloc) + # Removing the query if need be + cacheUrl = self._match_namespace(fparsed.path) if cacheUrl: return cacheUrl @@ -200,7 +201,7 @@ async def get_working_cache(self, fileloc: str) -> str: # add all the director-provided caches to the list (doing a round of de-dup) cache_list = [] if self.preferredCaches: - cache_list = [urllib.parse.urljoin(cache, fileloc) if cache != "+" else "+" for cache in self.preferredCaches] + cache_list = [urllib.parse.urlparse(urllib.parse.urljoin(cache, fileloc))._replace(query=fparsed.query).geturl() if cache != "+" else "+" for cache in self.preferredCaches] namespace = "/" if not self.preferredCaches or ("+" in self.preferredCaches): headers = await self.get_director_headers(fileloc) @@ -208,7 +209,7 @@ async def get_working_cache(self, fileloc: str) -> str: old_cache_list = cache_list cache_list = [] cache_set = set() - new_caches = [entry[0] for entry in metalist] + new_caches = [urllib.parse.urlparse(entry[0])._replace(query=fparsed.query).geturl() for entry in metalist] for cache in old_cache_list: if cache == "+": for cache2 in new_caches: @@ -225,6 +226,8 @@ async def get_working_cache(self, fileloc: str) -> str: # Timeout response in seconds - the default response is 5 minutes timeout = aiohttp.ClientTimeout(total=5) session = await self.httpFileSystem.set_session() + if self.token: + session.headers["Authorization"] = self.token try: async with session.head(updatedUrl, timeout=timeout) as resp: if resp.status >= 200 and resp.status < 400: diff --git a/test/test_director.py b/test/test_director.py index 3954093..0e04b5e 100644 --- a/test/test_director.py +++ b/test/test_director.py @@ -208,3 +208,53 @@ def test_open_mapper(httpserver: HTTPServer, get_client): pelMap = pelicanfs.core.PelicanMap("/foo", pelfs=pelfs) assert pelMap['bar'] == b'hello, world!' + +def test_authorization_headers(httpserver: HTTPServer, get_client): + foo_bar_url = httpserver.url_for("/foo/bar") + test_headers_with_bearer = {"Authorization": "Bearer test"} + + httpserver.expect_request("/.well-known/pelican-configuration").respond_with_json({"director_endpoint": httpserver.url_for("/")}) + httpserver.expect_oneshot_request("/foo/bar", method="GET").respond_with_data( + "", + status=307, + headers={"Link": f'<{foo_bar_url}>; rel="duplicate"; pri=1; depth=1', + "Location": foo_bar_url, + "X-Pelican-Namespace": "namespace=/foo" + }, + ) + + httpserver.expect_request("/foo/bar", headers=test_headers_with_bearer, method="HEAD").respond_with_data("hello, world!") + httpserver.expect_request("/foo/bar", headers=test_headers_with_bearer, method="GET").respond_with_data("hello, world!") + + pelfs = pelicanfs.core.PelicanFileSystem( + httpserver.url_for("/"), + get_client=get_client, + skip_instance_cache=True, + headers = test_headers_with_bearer + ) + + assert pelfs.cat("/foo/bar", headers={'Authorization': 'Bearer test'}) == b"hello, world!" + +def test_authz_query(httpserver: HTTPServer, get_client): + foo_bar_url = httpserver.url_for("/foo/bar") + + httpserver.expect_request("/.well-known/pelican-configuration").respond_with_json({"director_endpoint": httpserver.url_for("/")}) + httpserver.expect_oneshot_request("/foo/bar", method="GET").respond_with_data( + "", + status=307, + headers={"Link": f'<{foo_bar_url}>; rel="duplicate"; pri=1; depth=1', + "Location": foo_bar_url, + "X-Pelican-Namespace": "namespace=/foo" + }, + ) + + httpserver.expect_request("/foo/bar", query_string="authz=test", method="HEAD").respond_with_data("hello, world!") + httpserver.expect_request("/foo/bar", query_string="authz=test", method="GET").respond_with_data("hello, world!") + + pelfs = pelicanfs.core.PelicanFileSystem( + httpserver.url_for("/"), + get_client=get_client, + skip_instance_cache=True, + ) + + assert pelfs.cat("/foo/bar?authz=test") == b"hello, world!"