-
Notifications
You must be signed in to change notification settings - Fork 39
/
process_includes.py
executable file
·361 lines (313 loc) · 11.2 KB
/
process_includes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
#!/usr/bin/env python
"""
python %prog [options] <in_schema.xsd> <out_schema.xsd>
Synopsis:
Prepare schema document. Replace include and import elements.
Examples:
python %prog myschema.xsd
python %prog myschema.xsd newschema.xsd
python %prog -f myschema.xsd newschema.xsd
cat infile.xsd | python %prog > outfile.xsd
"""
#
# Imports
import sys
import os
import urllib2
import ftplib
import copy
import types
from optparse import OptionParser, Values
import itertools
from copy import deepcopy
from lxml import etree
#
# Globals and constants
#
# Do not modify the following VERSION comments.
# Used by updateversion.py.
##VERSION##
VERSION = '2.7c'
##VERSION##
Namespaces = {'xs': 'http://www.w3.org/2001/XMLSchema'}
Xsd_namespace_uri = 'http://www.w3.org/2001/XMLSchema'
#
# Functions for external use
def process_include_files(infile, outfile, inpath=''):
options = Values({
'force': False,
})
prep_schema_doc(infile, outfile, inpath, options)
#
# Classes
class Params(object):
members = ('base_url', 'already_processed', 'parent_url', )
def __init__(self):
self.base_url = None
self.already_processed = set()
self.parent_url = None
def __setattr__(self, name, value):
if name not in self.members:
raise AttributeError('Class %s has no set-able attribute "%s"' % (
self.__class__.__name__, name, ))
self.__dict__[name] = value
class SchemaIOError(IOError):
pass
class RaiseComplexTypesError(Exception):
pass
#
# Functions for internal use and testing
def clear_includes_and_imports(node):
namespace = node.nsmap[node.prefix]
child_iter1 = node.iterfind('{%s}include' % (namespace, ))
child_iter2 = node.iterfind('{%s}import' % (namespace, ))
for child in itertools.chain(child_iter1, child_iter2):
repl = etree.Comment(etree.tostring(child))
repl.tail = '\n'
node.replace(child, repl)
def resolve_ref(node, params, options):
content = None
url = node.get('schemaLocation')
if not url:
msg = '*** Warning: missing "schemaLocation" attribute in %s\n' % (
params.parent_url, )
sys.stderr.write(msg)
return None
# Uncomment the next line to help track down missing schemaLocation etc.
# print '(resolve_ref) url: %s\n parent-url: %s' % (url, params.parent_url, )
if params.base_url and not (
url.startswith('/') or
url.startswith('http:') or
url.startswith('ftp:')
):
locn = '%s/%s' % (params.base_url, url, )
schema_name = locn
else:
locn = url
schema_name = url
if not (
url.startswith('/') or
url.startswith('http:') or
url.startswith('ftp:')
):
schema_name = os.path.abspath(locn)
if locn is not None:
if schema_name not in params.already_processed:
params.already_processed.add(schema_name)
## print 'trace --'
## print ' url: : %s' % (url, )
## print ' base : %s' % (params.base_url, )
## print ' parent : %s' % (params.parent_url, )
## print ' locn : %s' % (locn, )
## print ' schema_name : %s\n' % (schema_name, )
if locn.startswith('http:') or locn.startswith('ftp:'):
try:
urlfile = urllib2.urlopen(locn)
content = urlfile.read()
urlfile.close()
params.parent_url = locn
params.base_url = os.path.split(locn)[0]
except urllib2.HTTPError, exp:
msg = "Can't find file %s referenced in %s." % (
locn, params.parent_url, )
raise SchemaIOError(msg)
else:
if os.path.exists(locn):
infile = open(locn)
content = infile.read()
infile.close()
params.parent_url = locn
params.base_url = os.path.split(locn)[0]
if content is None:
msg = "Can't find file %s referenced in %s." % (
locn, params.parent_url, )
raise SchemaIOError(msg)
## if content is None:
## msg = "Can't find file %s referenced in %s." % (
## locn, params.parent_url, )
## raise SchemaIOError(msg)
return content
def collect_inserts(node, params, inserts, options):
namespace = node.nsmap[node.prefix]
child_iter1 = node.iterfind('{%s}include' % (namespace, ))
child_iter2 = node.iterfind('{%s}import' % (namespace, ))
for child in itertools.chain(child_iter1, child_iter2):
collect_inserts_aux(child, params, inserts, options)
def collect_inserts_aux(child, params, inserts, options):
save_base_url = params.base_url
string_content = resolve_ref(child, params, options)
if string_content is not None:
root = etree.fromstring(string_content, base_url=params.base_url)
for child1 in root:
if not isinstance(child1, etree._Comment):
namespace = child1.nsmap[child1.prefix]
if (child1.tag != '{%s}include' % (namespace, ) and
child1.tag != '{%s' % (namespace, )):
comment = etree.Comment(etree.tostring(child))
comment.tail = '\n'
inserts.append(comment)
inserts.append(child1)
else: # is comment
inserts.append(child1)
collect_inserts(root, params, inserts, options)
params.base_url = save_base_url
def make_file(outFileName, options):
outFile = None
if (not options.force) and os.path.exists(outFileName):
reply = raw_input('File %s exists. Overwrite? (y/n): ' % outFileName)
if reply == 'y':
outFile = open(outFileName, 'w')
else:
outFile = open(outFileName, 'w')
return outFile
def prep_schema_doc(infile, outfile, inpath, options):
doc1 = etree.parse(infile)
root1 = doc1.getroot()
params = Params()
params.parent_url = infile
params.base_url = os.path.split(inpath)[0]
inserts = []
collect_inserts(root1, params, inserts, options)
root2 = copy.copy(root1)
clear_includes_and_imports(root2)
for insert_node in inserts:
root2.append(insert_node)
process_groups(root2)
raise_anon_complextypes(root2)
doc2 = etree.ElementTree(root2)
doc2.write(outfile)
return doc2
def prep_schema(inpath, outpath, options):
if inpath:
infile = open(inpath, 'r')
else:
infile = sys.stdin
if outpath:
outfile = make_file(outpath, options)
else:
outfile = sys.stdout
if outfile is None:
return
prep_schema_doc(infile, outfile, inpath, options)
if inpath:
infile.close()
if outpath:
outfile.close()
def process_groups(root):
# Get all the xs:group definitions at top level.
defs = root.xpath('./xs:group', namespaces=Namespaces)
defs = [node for node in defs if node.get('name') is not None]
# Get all the xs:group references (below top level).
refs = root.xpath('./*//xs:group', namespaces=Namespaces)
refs = [node for node in refs if node.get('ref') is not None]
# Create a dictionary of the named model groups (definitions).
def_dict = {}
for node in defs:
def_dict[trim_prefix(node.get('name'))] = node
replace_group_defs(def_dict, refs)
def replace_group_defs(def_dict, refs):
for ref_node in refs:
name = trim_prefix(ref_node.get('ref'))
if name is None:
continue
def_node = def_dict.get(name)
if def_node is not None:
content = def_node.xpath('./xs:sequence|./xs:choice|./xs:all',
namespaces=Namespaces)
if content:
content = content[0]
parent = ref_node.getparent()
for node in content:
new_node = deepcopy(node)
# Copy minOccurs and maxOccurs attributes to new node.
value = ref_node.get('minOccurs')
if value is not None:
new_node.set('minOccurs', value)
value = ref_node.get('maxOccurs')
if value is not None:
new_node.set('maxOccurs', value)
ref_node.addprevious(new_node)
parent.remove(ref_node)
def raise_anon_complextypes(root):
""" Raise each anonymous complexType to top level and give it a name.
Rename if necessary to prevent duplicates.
"""
element_tag = '{%s}element' % (Xsd_namespace_uri, )
def_names = {}
# Collect top level complexTypes.
defs = root.xpath('./xs:complexType', namespaces=Namespaces)
for node in defs:
type_name = node.get('name')
def_names[type_name] = node
# Collect top level simpleTypes.
defs = root.xpath('./xs:simpleType', namespaces=Namespaces)
for node in defs:
type_name = node.get('name')
def_names[type_name] = node
# Find all complexTypes below top level.
# Raise them to top level and name them.
# Re-name if there is a duplicate (simpleType, complexType, or
# previous renamed type).
# Change the parent (xs:element) so the "type" attribute refers to
# the raised and renamed type.
# Collect the new types.
el = etree.Comment(text="Raised anonymous complexType definitions")
el.tail = "\n\n"
root.append(el)
defs = root.xpath('./*/*//xs:complexType', namespaces=Namespaces)
for node in defs:
parent = node.getparent()
if parent.tag != element_tag:
continue
name = parent.get('name')
if not name:
continue
type_name = '%sType' % (name, )
type_name = unique_name(type_name, def_names)
def_names[type_name] = node
parent.set('type', type_name)
node.set('name', type_name)
# Move the complexType node to top level.
root.append(node)
def unique_name(type_name, def_names):
orig_type_name = type_name
count = 0
while count < 100:
if type_name not in def_names:
return type_name
count += 1
type_name = '%s%d' % (orig_type_name, count, )
raise RaiseComplexTypesError('duplicate name count max (100) exceeded')
def trim_prefix(name):
names = name.split(':')
if len(names) == 1:
return names[0]
elif len(names) == 2:
return names[1]
else:
return None
USAGE_TEXT = __doc__
def usage(parser):
parser.print_help()
sys.exit(1)
def main():
parser = OptionParser(USAGE_TEXT)
parser.add_option("-f", "--force", action="store_true",
dest="force", default=False,
help="force overwrite without asking")
(options, args) = parser.parse_args()
if len(args) == 2:
inpath = args[0]
outpath = args[1]
elif len(args) == 1:
inpath = args[0]
outpath = None
elif len(args) == 0:
inpath = None
outpath = None
else:
usage(parser)
prep_schema(inpath, outpath, options)
if __name__ == "__main__":
#import pdb; pdb.set_trace()
main()