forked from ggml-org/llama.cpp
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathts_converter.py
150 lines (121 loc) · 6.06 KB
/
ts_converter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
from typing import Any, Dict, List, Set, Tuple, Union
import json
from pydantic import Json
class SchemaToTypeScriptConverter:
# TODO: comments for arguments!
# // Get the price of a particular car model
# type get_car_price = (_: {
# // The name of the car model.
# car_name: string,
# }) => any;
# // get the weather of a location
# type get_weather = (_: {
# // where to get weather.
# location: string,
# }) => any;
def __init__(self, allow_fetch: bool = True):
self._refs: Dict[str, Json[Any]] = {}
self._refs_being_resolved: Set[str] = set()
self._allow_fetch = allow_fetch
def resolve_refs(self, schema: Json[Any], url: str):
'''
Resolves all $ref fields in the given schema, fetching any remote schemas,
replacing $ref with absolute reference URL and populating self._refs with the
respective referenced (sub)schema dictionaries.
'''
def visit(n: Json[Any]):
if isinstance(n, list):
return [visit(x) for x in n]
elif isinstance(n, dict):
ref = n.get('$ref')
if ref is not None and ref not in self._refs:
if ref.startswith('https://'):
assert self._allow_fetch, 'Fetching remote schemas is not allowed (use --allow-fetch for force)'
import requests
frag_split = ref.split('#')
base_url = frag_split[0]
target = self._refs.get(base_url)
if target is None:
target = self.resolve_refs(requests.get(ref).json(), base_url)
self._refs[base_url] = target
if len(frag_split) == 1 or frag_split[-1] == '':
return target
elif ref.startswith('#/'):
target = schema
ref = f'{url}{ref}'
n['$ref'] = ref
else:
raise ValueError(f'Unsupported ref {ref}')
for sel in ref.split('#')[-1].split('/')[1:]:
assert target is not None and sel in target, f'Error resolving ref {ref}: {sel} not in {target}'
target = target[sel]
self._refs[ref] = target
else:
for v in n.values():
visit(v)
return n
return visit(schema)
def _desc_comment(self, schema: Json[Any]):
desc = schema.get("description", "").replace("\n", "\n// ") if 'description' in schema else None
return f'// {desc}\n' if desc else ''
def _build_object_rule(self, properties: List[Tuple[str, Any]], required: Set[str], additional_properties: Union[bool, Any]):
if additional_properties == True:
additional_properties = {}
elif additional_properties == False:
additional_properties = None
return "{\n" + ',\n'.join([
f'{self._desc_comment(prop_schema)}{prop_name}{"" if prop_name in required else "?"}: {self.visit(prop_schema)}'
for prop_name, prop_schema in properties
] + (
[f"{self._desc_comment(additional_properties) if isinstance(additional_properties, dict) else ''}[key: string]: {self.visit(additional_properties)}"]
if additional_properties is not None else []
)) + "\n}"
def visit(self, schema: Json[Any]):
def print_constant(v):
return json.dumps(v)
schema_type = schema.get('type')
schema_format = schema.get('format')
if 'oneOf' in schema or 'anyOf' in schema:
return '|'.join(self.visit(s) for s in schema.get('oneOf') or schema.get('anyOf') or [])
elif isinstance(schema_type, list):
return '|'.join(self.visit({'type': t}) for t in schema_type)
elif 'const' in schema:
return print_constant(schema['const'])
elif 'enum' in schema:
return '|'.join((print_constant(v) for v in schema['enum']))
elif schema_type in (None, 'object') and \
('properties' in schema or \
('additionalProperties' in schema and schema['additionalProperties'] is not True)):
required = set(schema.get('required', []))
properties = list(schema.get('properties', {}).items())
return self._build_object_rule(properties, required, schema.get('additionalProperties'))
elif schema_type in (None, 'object') and 'allOf' in schema:
required = set()
properties = []
def add_component(comp_schema, is_required):
if (ref := comp_schema.get('$ref')) is not None:
comp_schema = self._refs[ref]
if 'properties' in comp_schema:
for prop_name, prop_schema in comp_schema['properties'].items():
properties.append((prop_name, prop_schema))
if is_required:
required.add(prop_name)
for t in schema['allOf']:
if 'anyOf' in t:
for tt in t['anyOf']:
add_component(tt, is_required=False)
else:
add_component(t, is_required=True)
return self._build_object_rule(properties, required, additional_properties={})
elif schema_type in (None, 'array') and ('items' in schema or 'prefixItems' in schema):
items = schema.get('items') or schema['prefixItems']
if isinstance(items, list):
return '[' + ', '.join(self.visit(item) for item in items) + '][]'
else:
return self.visit(items) + '[]'
elif schema_type in (None, 'string') and schema_format == 'date-time':
return 'Date'
elif (schema_type == 'object') or (len(schema) == 0):
return 'any'
else:
return 'number' if schema_type == 'integer' else schema_type or 'any'