11"""OpenAPI core validation validators module"""
2+ import re
23from functools import cached_property
34from typing import Any
45from typing import Mapping
2324)
2425from openapi_core .protocols import Request
2526from openapi_core .protocols import WebhookRequest
26- from openapi_core .schema .parameters import get_value
27+ from openapi_core .schema .parameters import get_aslist
28+ from openapi_core .schema .parameters import get_deep_object_value
29+ from openapi_core .schema .parameters import get_explode
30+ from openapi_core .schema .parameters import get_style
31+ from openapi_core .schema .protocols import SuportsGetAll
32+ from openapi_core .schema .protocols import SuportsGetList
2733from openapi_core .spec import Spec
2834from openapi_core .templating .media_types .datatypes import MediaType
2935from openapi_core .templating .paths .datatypes import PathOperationServer
@@ -70,10 +76,14 @@ def __init__(
7076 self .extra_format_validators = extra_format_validators
7177 self .extra_media_type_deserializers = extra_media_type_deserializers
7278
73- def _get_media_type (self , content : Spec , mimetype : str ) -> MediaType :
79+ def _find_media_type (
80+ self , content : Spec , mimetype : Optional [str ] = None
81+ ) -> MediaType :
7482 from openapi_core .templating .media_types .finders import MediaTypeFinder
7583
7684 finder = MediaTypeFinder (content )
85+ if mimetype is None :
86+ return finder .get_first ()
7787 return finder .find (mimetype )
7888
7989 def _deserialise_media_type (self , mimetype : str , value : Any ) -> Any :
@@ -99,69 +109,93 @@ def _validate_schema(self, schema: Spec, value: Any) -> None:
99109 )
100110 validator .validate (value )
101111
102- def _get_param_or_header_value (
112+ def _get_param_or_header (
103113 self ,
104114 param_or_header : Spec ,
105115 location : Mapping [str , Any ],
106116 name : Optional [str ] = None ,
107117 ) -> Any :
108- casted , schema = self ._get_param_or_header_value_and_schema (
109- param_or_header , location , name
118+ # Simple scenario
119+ if "content" not in param_or_header :
120+ return self ._get_simple_param_or_header (
121+ param_or_header , location , name = name
122+ )
123+
124+ # Complex scenario
125+ return self ._get_complex_param_or_header (
126+ param_or_header , location , name = name
127+ )
128+
129+ def _get_simple_param_or_header (
130+ self ,
131+ param_or_header : Spec ,
132+ location : Mapping [str , Any ],
133+ name : Optional [str ] = None ,
134+ ) -> Any :
135+ try :
136+ raw = self ._get_style_value (param_or_header , location , name = name )
137+ except KeyError :
138+ # in simple scenrios schema always exist
139+ schema = param_or_header / "schema"
140+ if "default" not in schema :
141+ raise
142+ raw = schema ["default" ]
143+ return self ._convert_schema_style_value (raw , param_or_header )
144+
145+ def _get_complex_param_or_header (
146+ self ,
147+ param_or_header : Spec ,
148+ location : Mapping [str , Any ],
149+ name : Optional [str ] = None ,
150+ ) -> Any :
151+ content = param_or_header / "content"
152+ # no point to catch KetError
153+ # in complex scenrios schema doesn't exist
154+ raw = self ._get_media_type_value (param_or_header , location , name = name )
155+ return self ._convert_content_schema_value (raw , content )
156+
157+ def _convert_schema_style_value (
158+ self ,
159+ raw : Any ,
160+ param_or_header : Spec ,
161+ ) -> Any :
162+ casted , schema = self ._convert_schema_style_value_and_schema (
163+ raw , param_or_header
110164 )
111165 if schema is None :
112166 return casted
113167 self ._validate_schema (schema , casted )
114168 return casted
115169
116- def _get_content_value (
117- self , raw : Any , mimetype : str , content : Spec
170+ def _convert_content_schema_value (
171+ self , raw : Any , content : Spec , mimetype : Optional [ str ] = None
118172 ) -> Any :
119- casted , schema = self ._get_content_value_and_schema (
120- raw , mimetype , content
173+ casted , schema = self ._convert_content_schema_value_and_schema (
174+ raw , content , mimetype
121175 )
122176 if schema is None :
123177 return casted
124178 self ._validate_schema (schema , casted )
125179 return casted
126180
127- def _get_param_or_header_value_and_schema (
181+ def _convert_schema_style_value_and_schema (
128182 self ,
183+ raw : Any ,
129184 param_or_header : Spec ,
130- location : Mapping [str , Any ],
131- name : Optional [str ] = None ,
132185 ) -> Tuple [Any , Spec ]:
133- try :
134- raw_value = get_value (param_or_header , location , name = name )
135- except KeyError :
136- if "schema" not in param_or_header :
137- raise
138- schema = param_or_header / "schema"
139- if "default" not in schema :
140- raise
141- casted = schema ["default" ]
142- else :
143- # Simple scenario
144- if "content" not in param_or_header :
145- deserialised = self ._deserialise_style (
146- param_or_header , raw_value
147- )
148- schema = param_or_header / "schema"
149- # Complex scenario
150- else :
151- content = param_or_header / "content"
152- mimetype , media_type = next (content .items ())
153- deserialised = self ._deserialise_media_type (
154- mimetype , raw_value
155- )
156- schema = media_type / "schema"
157- casted = self ._cast (schema , deserialised )
186+ deserialised = self ._deserialise_style (param_or_header , raw )
187+ schema = param_or_header / "schema"
188+ casted = self ._cast (schema , deserialised )
158189 return casted , schema
159190
160- def _get_content_value_and_schema (
161- self , raw : Any , mimetype : str , content : Spec
191+ def _convert_content_schema_value_and_schema (
192+ self ,
193+ raw : Any ,
194+ content : Spec ,
195+ mimetype : Optional [str ] = None ,
162196 ) -> Tuple [Any , Optional [Spec ]]:
163- media_type , mimetype = self ._get_media_type (content , mimetype )
164- deserialised = self ._deserialise_media_type (mimetype , raw )
197+ media_type , mime_type = self ._find_media_type (content , mimetype )
198+ deserialised = self ._deserialise_media_type (mime_type , raw )
165199 casted = self ._cast (media_type , deserialised )
166200
167201 if "schema" not in media_type :
@@ -170,6 +204,45 @@ def _get_content_value_and_schema(
170204 schema = media_type / "schema"
171205 return casted , schema
172206
207+ def _get_style_value (
208+ self ,
209+ param_or_header : Spec ,
210+ location : Mapping [str , Any ],
211+ name : Optional [str ] = None ,
212+ ) -> Any :
213+ name = name or param_or_header ["name" ]
214+ style = get_style (param_or_header )
215+ if name not in location :
216+ # Only check if the name is not in the location if the style of
217+ # the param is deepObject,this is because deepObjects will never be found
218+ # as their key also includes the properties of the object already.
219+ if style != "deepObject" :
220+ raise KeyError
221+ keys_str = " " .join (location .keys ())
222+ if not re .search (rf"{ name } \[\w+\]" , keys_str ):
223+ raise KeyError
224+
225+ aslist = get_aslist (param_or_header )
226+ explode = get_explode (param_or_header )
227+ if aslist and explode :
228+ if style == "deepObject" :
229+ return get_deep_object_value (location , name )
230+ if isinstance (location , SuportsGetAll ):
231+ return location .getall (name )
232+ if isinstance (location , SuportsGetList ):
233+ return location .getlist (name )
234+
235+ return location [name ]
236+
237+ def _get_media_type_value (
238+ self ,
239+ param_or_header : Spec ,
240+ location : Mapping [str , Any ],
241+ name : Optional [str ] = None ,
242+ ) -> Any :
243+ name = name or param_or_header ["name" ]
244+ return location [name ]
245+
173246
174247class BaseAPICallValidator (BaseValidator ):
175248 @cached_property
0 commit comments