-
Notifications
You must be signed in to change notification settings - Fork 34
/
Copy pathcreate_documents.py
244 lines (191 loc) · 7.34 KB
/
create_documents.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
import importlib
import importlib.util
import inspect
import json
from collections import OrderedDict
from pathlib import Path
from typing import Any, Dict, List, Set, Union, cast
import datamodel_code_generator
from event_model.basemodels import ALL_BASEMODELS
from .type_wrapper import BaseModel, to_snake
JSONSCHEMA = Path(__file__).parent.parent / "jsonschemas"
DOCUMENTS = Path(__file__).parent.parent / "documents"
# Used to add user written schema to autogenerated schema.
def merge_dicts(dict1: dict, dict2: dict) -> dict:
return_dict = dict2.copy()
for key in dict1:
if key not in dict2:
return_dict[key] = dict1[key]
elif not isinstance(dict1[key], type(dict2[key])):
return_dict[key] = dict1[key]
elif isinstance(dict1[key], dict):
return_dict[key] = merge_dicts(dict1[key], dict2[key])
elif isinstance(dict1[key], list):
return_dict[key] = dict1[key] + dict2[key]
return return_dict
def sort_alphabetically(schema: Dict) -> Dict:
"""Sorts the schema alphabetically by key name, exchanging the
properties dicts for OrderedDicts"""
schema = OrderedDict(sorted(schema.items(), key=lambda x: x[0]))
return schema
SortOrder = {
"title": 0,
"description": 1,
"type": 2,
"$defs": 3,
"properties": 4,
"required": 5,
"patternProperties": 6,
"additionalProperties": 7,
}
def sort_schema(document_schema: Dict[str, Any]) -> Dict[str, Any]:
assert isinstance(document_schema, dict)
document_schema = OrderedDict(
sorted(
document_schema.items(),
key=lambda x: SortOrder.get(x[0], len(SortOrder)),
)
)
for key in document_schema:
if key in ("$defs", "properties", "required"):
if isinstance(document_schema[key], dict):
document_schema[key] = sort_alphabetically(document_schema[key])
for key2 in document_schema[key]:
if isinstance(document_schema[key][key2], dict):
document_schema[key][key2] = sort_schema(
document_schema[key][key2]
)
elif isinstance(document_schema[key], list):
document_schema[key].sort()
return document_schema
def dump_json(schema: Dict[str, Any], jsonschema_path: Path):
"""Returns true if the basemodel had to change, false otherwise"""
sorted_schema = sort_schema(schema)
with jsonschema_path.open(mode="w") as f:
json.dump(sorted_schema, f, indent=4)
return True
def remove_subschema(
schema: Dict[str, Any], subschema: Dict[str, Any]
) -> Dict[str, Any]:
for key, value in subschema.items():
if key in schema:
if isinstance(value, dict):
schema[key] = remove_subschema(schema[key], value)
if not schema[key]:
schema.pop(key, None)
else:
schema.pop(key, None)
return schema
def jsonschema_differs_from_saved(
schema: Dict[str, Any],
jsonschema_path: Path,
ignore_schema: Union[Dict[str, Any], None] = None,
):
"""
Check if the schema at the given file path differs from the one passed in,
ignoring the extra schema.
"""
if not jsonschema_path.exists():
print("+++++++++++++++ PATH DOESN@T EXIST", jsonschema_path)
return True
with jsonschema_path.open("r") as file:
file_schema = json.load(file)
# Remove extra schema keys from the file schema if extra_schema is provided
if ignore_schema:
file_schema = remove_subschema(file_schema, ignore_schema)
file_schema.pop("additionalProperties", None)
schema.pop("additionalProperties", None)
return sort_schema(file_schema) != sort_schema(schema)
def import_basemodels(path: Path) -> List[type[BaseModel]]:
# Dynamically import the module
module_name = path.stem
spec = importlib.util.spec_from_file_location(module_name, path)
if spec is None or spec.loader is None:
raise RuntimeError(f"Failed to import {path}")
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return [
attribute
for attr_str in dir(module)
if inspect.isclass(attribute := getattr(module, attr_str))
and issubclass(attribute, BaseModel)
and attribute != BaseModel
]
def generate_typeddict(jsonschema_path: Path, documents_path=DOCUMENTS):
output_path = documents_path / f"{jsonschema_path.stem}.py"
datamodel_code_generator.generate(
input_=jsonschema_path,
input_file_type=datamodel_code_generator.InputFileType.JsonSchema,
output=output_path,
output_model_type=datamodel_code_generator.DataModelType.TypingTypedDict,
use_schema_description=True,
use_field_description=True,
use_annotated=True,
field_constraints=True,
wrap_string_literal=True,
)
with output_path.open("r+") as f:
content = f.read()
f.seek(0, 0)
f.write("# ruff: noqa\n" + content)
def get_jsonschema_path(jsonschema: Dict, parent_path=JSONSCHEMA) -> Path:
return parent_path / f"{to_snake(jsonschema['title'])}.json"
def generate_jsonschema(
basemodel: type[BaseModel],
jsonschema_parent_path=JSONSCHEMA,
documents_parent_path=DOCUMENTS,
) -> Set[Path]:
all_schema = set()
schema_extra: Dict[str, Any] = cast(
Dict[str, Any], basemodel.model_config.pop("json_schema_extra", {})
)
model_jsonschema = basemodel.model_json_schema()
jsonschema_path = get_jsonschema_path(
model_jsonschema, parent_path=jsonschema_parent_path
)
if jsonschema_differs_from_saved(
model_jsonschema.copy(),
jsonschema_path,
ignore_schema=schema_extra,
):
print(f"Detected change in {basemodel}, updating schema.")
# Dump with the extra schema that we want to leave out of
# the TypedDict conversion
dump_json(model_jsonschema, jsonschema_path=jsonschema_path)
generate_typeddict(jsonschema_path, documents_path=documents_parent_path)
else:
print(f"No change in {basemodel}.")
# Dump the schema with the extra schema, also updates extra schema
# if it's changed.
dump_json(
sort_schema(merge_dicts(model_jsonschema, schema_extra)),
jsonschema_path=jsonschema_path,
)
all_schema.add(jsonschema_path)
for parent in [parent for parent in basemodel.__bases__ if parent is not BaseModel]:
assert issubclass(
parent, BaseModel
) # Parents of BaseModel's can only be other BaseModel
all_schema.update(
generate_jsonschema(
parent,
jsonschema_parent_path=jsonschema_parent_path,
documents_parent_path=documents_parent_path,
)
)
return all_schema
def generate():
all_schema = set()
for basemodel in ALL_BASEMODELS:
all_schema.update(generate_jsonschema(basemodel))
init_py_imports = "\n".join(
# Using the schema path since it will have the same stem as the TypedDict files
sorted(
[
f"from .{schema_path.stem} import * # noqa: F403"
for schema_path in all_schema
]
)
)
with open(DOCUMENTS / "__init__.py", "w") as f:
f.write(init_py_imports + "\n")