@@ -197,14 +197,43 @@ class TestOAuthFlow:
197197 """Test OAuth flow methods."""
198198
199199 @pytest .mark .anyio
200- async def test_discover_protected_resource_request (self , oauth_provider ):
201- """Test protected resource discovery request building."""
202- request = await oauth_provider ._discover_protected_resource ()
200+ async def test_discover_protected_resource_request (self , client_metadata , mock_storage ):
201+ """Test protected resource discovery request building maintains backward compatibility."""
203202
203+ async def redirect_handler (url : str ) -> None :
204+ pass
205+
206+ async def callback_handler () -> tuple [str , str | None ]:
207+ return "test_auth_code" , "test_state"
208+
209+ provider = OAuthClientProvider (
210+ server_url = "https://api.example.com" ,
211+ client_metadata = client_metadata ,
212+ storage = mock_storage ,
213+ redirect_handler = redirect_handler ,
214+ callback_handler = callback_handler ,
215+ )
216+
217+ # Test without WWW-Authenticate (fallback)
218+ init_response = httpx .Response (
219+ status_code = 401 , headers = {}, request = httpx .Request ("GET" , "https://request-api.example.com" )
220+ )
221+
222+ request = await provider ._discover_protected_resource (init_response )
204223 assert request .method == "GET"
205224 assert str (request .url ) == "https://api.example.com/.well-known/oauth-protected-resource"
206225 assert "mcp-protocol-version" in request .headers
207226
227+ # Test with WWW-Authenticate header
228+ init_response .headers ["WWW-Authenticate" ] = (
229+ 'Bearer resource_metadata="https://prm.example.com/.well-known/oauth-protected-resource/path"'
230+ )
231+
232+ request = await provider ._discover_protected_resource (init_response )
233+ assert request .method == "GET"
234+ assert str (request .url ) == "https://prm.example.com/.well-known/oauth-protected-resource/path"
235+ assert "mcp-protocol-version" in request .headers
236+
208237 @pytest .mark .anyio
209238 async def test_discover_oauth_metadata_request (self , oauth_provider ):
210239 """Test OAuth metadata discovery request building."""
@@ -660,3 +689,114 @@ def test_build_metadata(
660689 "code_challenge_methods_supported" : ["S256" ],
661690 }
662691 )
692+
693+
694+ class TestProtectedResourceWWWAuthenticate :
695+ """Test RFC9728 WWW-Authenticate header parsing functionality for protected resource."""
696+
697+ @pytest .mark .parametrize (
698+ "www_auth_header,expected_url" ,
699+ [
700+ # Quoted URL
701+ (
702+ 'Bearer resource_metadata="https://api.example.com/.well-known/oauth-protected-resource"' ,
703+ "https://api.example.com/.well-known/oauth-protected-resource" ,
704+ ),
705+ # Unquoted URL
706+ (
707+ "Bearer resource_metadata=https://api.example.com/.well-known/oauth-protected-resource" ,
708+ "https://api.example.com/.well-known/oauth-protected-resource" ,
709+ ),
710+ # Complex header with multiple parameters
711+ (
712+ 'Bearer realm="api", resource_metadata="https://api.example.com/.well-known/oauth-protected-resource", '
713+ 'error="insufficient_scope"' ,
714+ "https://api.example.com/.well-known/oauth-protected-resource" ,
715+ ),
716+ # Different URL format
717+ ('Bearer resource_metadata="https://custom.domain.com/metadata"' , "https://custom.domain.com/metadata" ),
718+ # With path and query params
719+ (
720+ 'Bearer resource_metadata="https://api.example.com/auth/metadata?version=1"' ,
721+ "https://api.example.com/auth/metadata?version=1" ,
722+ ),
723+ ],
724+ )
725+ def test_extract_resource_metadata_from_www_auth_valid_cases (
726+ self , client_metadata , mock_storage , www_auth_header , expected_url
727+ ):
728+ """Test extraction of resource_metadata URL from various valid WWW-Authenticate headers."""
729+
730+ async def redirect_handler (url : str ) -> None :
731+ pass
732+
733+ async def callback_handler () -> tuple [str , str | None ]:
734+ return "test_auth_code" , "test_state"
735+
736+ provider = OAuthClientProvider (
737+ server_url = "https://api.example.com/v1/mcp" ,
738+ client_metadata = client_metadata ,
739+ storage = mock_storage ,
740+ redirect_handler = redirect_handler ,
741+ callback_handler = callback_handler ,
742+ )
743+
744+ init_response = httpx .Response (
745+ status_code = 401 ,
746+ headers = {"WWW-Authenticate" : www_auth_header },
747+ request = httpx .Request ("GET" , "https://api.example.com/test" ),
748+ )
749+
750+ result = provider ._extract_resource_metadata_from_www_auth (init_response )
751+ assert result == expected_url
752+
753+ @pytest .mark .parametrize (
754+ "status_code,www_auth_header,description" ,
755+ [
756+ # No header
757+ (401 , None , "no WWW-Authenticate header" ),
758+ # Empty header
759+ (401 , "" , "empty WWW-Authenticate header" ),
760+ # Header without resource_metadata
761+ (401 , 'Bearer realm="api", error="insufficient_scope"' , "no resource_metadata parameter" ),
762+ # Malformed header
763+ (401 , "Bearer resource_metadata=" , "malformed resource_metadata parameter" ),
764+ # Non-401 status code
765+ (
766+ 200 ,
767+ 'Bearer resource_metadata="https://api.example.com/.well-known/oauth-protected-resource"' ,
768+ "200 OK response" ,
769+ ),
770+ (
771+ 500 ,
772+ 'Bearer resource_metadata="https://api.example.com/.well-known/oauth-protected-resource"' ,
773+ "500 error response" ,
774+ ),
775+ ],
776+ )
777+ def test_extract_resource_metadata_from_www_auth_invalid_cases (
778+ self , client_metadata , mock_storage , status_code , www_auth_header , description
779+ ):
780+ """Test extraction returns None for invalid cases."""
781+
782+ async def redirect_handler (url : str ) -> None :
783+ pass
784+
785+ async def callback_handler () -> tuple [str , str | None ]:
786+ return "test_auth_code" , "test_state"
787+
788+ provider = OAuthClientProvider (
789+ server_url = "https://api.example.com/v1/mcp" ,
790+ client_metadata = client_metadata ,
791+ storage = mock_storage ,
792+ redirect_handler = redirect_handler ,
793+ callback_handler = callback_handler ,
794+ )
795+
796+ headers = {"WWW-Authenticate" : www_auth_header } if www_auth_header is not None else {}
797+ init_response = httpx .Response (
798+ status_code = status_code , headers = headers , request = httpx .Request ("GET" , "https://api.example.com/test" )
799+ )
800+
801+ result = provider ._extract_resource_metadata_from_www_auth (init_response )
802+ assert result is None , f"Should return None for { description } "
0 commit comments