-
Notifications
You must be signed in to change notification settings - Fork 163
/
Copy pathflattener.py
186 lines (154 loc) · 7.25 KB
/
flattener.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# pylint: disable=too-few-public-methods,raising-format-tuple
import logging
from ordered_set import OrderedSet
from .pointer import fragment_decode
from .utils import TYPE, ConstraintError, FlatteningError, schema_merge, traverse
LOG = logging.getLogger(__name__)
COMBINERS = ("oneOf", "anyOf", "allOf")
class JsonSchemaFlattener:
"""Flatten a schema into a minimal collection of objects by replacing
nested objects with refs, and inlining refs to primitive types.
The flattener makes certain assumptions while processing:
1) The schema must be an object (not simply a boolean)
2) Each property can only be a single type.
A) For primitive types, combiners (``anyOf``, ``allOf``, and ``oneOf``)
B) For object types, the flattener will attempt to squash all properties
specified in combiners into the object, and will fail if multiple types are
declared.
3) Truthy ``additionalProperties`` on objects are not allowed
4) For objects, ``properties`` and ``patternProperties`` are mutually exclusive
"""
def __init__(self, schema):
self._schema_map = {}
self._full_schema = schema
def flatten_schema(self):
self._walk(self._full_schema, ())
return self._schema_map
def _walk(self, sub_schema, property_path):
# have we already seen this path?
if property_path in self._schema_map:
return {"$ref": property_path}
# placeholder so as to not reprocess
self._schema_map[property_path] = None
# work on shallow copy to avoid modifying the schema
sub_schema = dict(sub_schema)
# is it a reference?
try:
ref_path = sub_schema["$ref"]
except KeyError:
# schemas without type are assumed to be objects
json_type = sub_schema.get("type", "object")
if json_type == "array":
sub_schema = self._flatten_array_type(sub_schema, property_path)
elif json_type == "object":
sub_schema = self._flatten_object_type(sub_schema, property_path)
else:
sub_schema = self._flatten_ref_type(ref_path)
# if the path was never added to the schema map, remove placeholder
if self._schema_map[property_path] is None:
self._schema_map.pop(property_path)
# for primitive types, we are done processing
return sub_schema
def _flatten_ref_type(self, ref_path):
"""This method flattens a schema ref.
* Refs to an object will have its own class, so the ref will be returned as is.
* Refs to a primitive will be inlined into the schema, removing the ref.
"""
if isinstance(ref_path, tuple):
# we have already processed the ref, usually via flattening combiners
ref_parts = ref_path
else:
try:
ref_parts = fragment_decode(ref_path)
except ValueError as e:
# pylint: disable=W0707
raise FlatteningError(f"Invalid ref at path '{ref_path}': { str(e)}")
ref_schema, ref_parts, _ref_parent = self._find_subschema_by_ref(ref_parts)
return self._walk(ref_schema, ref_parts)
def _flatten_array_type(self, sub_schema, path):
# if "additionalItems" is truthy (e.g. a non-empty object), then fail
if sub_schema.get("additionalItems"):
raise ConstraintError("Object at '{path}' has 'additionalItems'", path)
# if "items" are defined, each resolve each property
try:
items_schema = sub_schema["items"]
except KeyError:
pass
else:
sub_schema["items"] = self._walk(items_schema, path + ("items",))
return sub_schema
def _flatten_object_type(self, sub_schema, path):
# we only care about allOf, anyOf, oneOf for object types
sub_schema = self._flatten_combiners(sub_schema, path)
# if "additionalProperties" is truthy (e.g. a non-empty object), then fail
if sub_schema.get("additionalProperties"):
raise ConstraintError("Object at '{path}' has 'additionalProperties'", path)
# don't allow these together
if "properties" in sub_schema and "patternProperties" in sub_schema:
msg = (
"Object at '{path}' has mutually exclusive "
"'properties' and 'patternProperties'"
)
raise ConstraintError(msg, path)
# if "properties" are defined, resolve each property
try:
properties = sub_schema["properties"]
except KeyError:
pass
else:
# resolve each property schema
new_properties = {}
for prop_name, prop_schema in properties.items():
new_properties[prop_name] = self._walk(
prop_schema, path + ("properties", prop_name)
)
# replace properties with resolved properties
sub_schema["properties"] = new_properties
self._schema_map[path] = sub_schema
return {"$ref": path}
# if "patternProperties" are defined, resolve each property
try:
pattern_properties = sub_schema["patternProperties"]
except KeyError:
pass
else:
new_pattern_properties = {}
for pattern, prop_schema in pattern_properties.items():
new_pattern_properties[pattern] = self._walk(
prop_schema, path + ("patternProperties", pattern)
)
sub_schema["patternProperties"] = new_pattern_properties
return sub_schema
def _flatten_combiners(self, sub_schema, path):
"""This method iterates through allOf, anyOf, and oneOf schemas and
merges them all into the surrounding sub_schema"""
for arr_key in COMBINERS:
try:
schema_array = sub_schema.pop(arr_key)
except KeyError:
pass
else:
for i, nested_schema in enumerate(schema_array):
ref_path = path + (arr_key, i)
ref_path_is_used = ref_path in self._schema_map
walked_schema = self._walk(nested_schema, ref_path)
# if no other schema is referencing the ref_path,
# we no longer need the refkey since the properties will be squashed
if ref_path_is_used:
resolved_schema = self._schema_map.get(ref_path)
else:
resolved_schema = self._schema_map.pop(ref_path, walked_schema)
schema_merge(sub_schema, resolved_schema, path)
if isinstance(sub_schema.get(TYPE), OrderedSet):
sub_schema[TYPE] = list(sub_schema[TYPE])
return sub_schema
def _find_subschema_by_ref(self, ref_path):
"""Outputs the section of the schema corresponding to the JSON Schema reference
:param str ref_path: json schema ref, like "#/definitions/prop"
:return: the subschema corresponding to the ref
"""
try:
return traverse(self._full_schema, ref_path)
except (LookupError, ValueError):
# pylint: disable=W0707
raise FlatteningError(f"Invalid ref: {ref_path}")