-
Notifications
You must be signed in to change notification settings - Fork 0
/
grobid-client.py
295 lines (257 loc) · 13.7 KB
/
grobid-client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
import sys
import os
import io
import json
import argparse
import time
import concurrent.futures
from client import ApiClient
import ntpath
import requests
'''
This version uses the standard ProcessPoolExecutor for parallelizing the concurrent calls to the GROBID services.
Given the limits of ThreadPoolExecutor (input stored in memory, blocking Executor.map until the whole input
is acquired), it works with batches of PDF of a size indicated in the config.json file (default is 1000 entries).
We are moving from first batch to the second one only when the first is entirely processed - which means it is
slightly sub-optimal, but should scale better. However acquiring a list of million of files in directories would
require something scalable too, which is not implemented for the moment.
'''
class grobid_client(ApiClient):
def __init__(self, config_path='./config.json'):
self.config = None
self._load_config(config_path)
def _load_config(self, path='./config.json'):
"""
Load the json configuration
"""
config_json = open(path).read()
self.config = json.loads(config_json)
# test if the server is up and running...
the_url = 'http://'+self.config['grobid_server']
if len(self.config['grobid_port'])>0:
the_url += ":"+self.config['grobid_port']
the_url += "/api/isalive"
r = requests.get(the_url)
status = r.status_code
if status != 200:
print('GROBID server does not appear up and running ' + str(status))
else:
print("GROBID server is up and running")
def process(self, input2, output, n, service, generateIDs, consolidate_header, consolidate_citations, force, teiCoordinates):
if service == "processCitation":
print("processCitation from TXT file")
if(input2 is not None):
batch_size_txt = self.config['batch_size']
f = open(input2, 'r')
txt_list = f.readlines()
f.close()
self.process_batch_txt(txt_list, input2, output, n, service, generateIDs, consolidate_header, consolidate_citations, force, teiCoordinates, batch_size_txt)
else:
print("QUITING: input file is not provided")
return
# just pass string
else:
batch_size_pdf = self.config['batch_size']
pdf_files = []
for (dirpath, dirnames, filenames) in os.walk(input2):
for filename in filenames:
if filename.endswith('.pdf') or filename.endswith('.PDF'):
pdf_files.append(os.sep.join([dirpath, filename]))
if len(pdf_files) == batch_size_pdf:
self.process_batch(pdf_files, output, n, service, generateIDs, consolidate_header, consolidate_citations, force, teiCoordinates)
pdf_files = []
# last batch
if len(pdf_files) > 0:
self.process_batch(pdf_files, output, n, service, generateIDs, consolidate_header, consolidate_citations, force, teiCoordinates)
def process_batch_txt(self, txt_list, input2, output, n, service, generateIDs, consolidate_header, consolidate_citations, force, teiCoordinates, batch_size_txt):
print(len(txt_list), "citations to process")
if output is not None:
filename = os.path.join(output, os.path.splitext(input2)[0] +"_"+ '0' + '.tei.xml')
else:
filename = os.path.join(ntpath.dirname(input2), os.path.splitext(input2)[0] +"_"+ '0' + '.tei.xml')
if not force and os.path.isfile(filename):
print(filename, "already exist, skipping... (use --force to reprocess pdf input2 files)")
return
amount_processed = 0
thousands = 0
if(n > 1):
with concurrent.futures.ProcessPoolExecutor(max_workers=n) as executor:
for txt_el in txt_list:
executor.submit(self.process_txt,filename, txt_el, input2, output, service, generateIDs, consolidate_header, consolidate_citations, force, teiCoordinates)
amount_processed+=1
if(amount_processed%batch_size_txt==0):
thousands += 1
if output is not None:
filename = os.path.join(output, os.path.splitext(input2)[0] +"_"+str(thousands) + '.tei.xml')
else:
filename = os.path.join(ntpath.dirname(input2), os.path.splitext(input2)[0] +"_"+ str(thousands) + '.tei.xml')
if not force and os.path.isfile(filename):
print(filename, "already exist, skipping... (use --force to reprocess pdf input2 files)")
return
else:
for txt_el in txt_list:
self.process_txt(filename, txt_el, input2, output, service, generateIDs, consolidate_header, consolidate_citations, force, teiCoordinates)
amount_processed+=1
if(amount_processed%batch_size_txt==0):
thousands += 1
if output is not None:
filename = os.path.join(output, os.path.splitext(input2)[0] +"_"+str(thousands) + '.tei.xml')
else:
filename = os.path.join(ntpath.dirname(input2), os.path.splitext(input2)[0] +"_"+ str(thousands) + '.tei.xml')
if not force and os.path.isfile(filename):
print(filename, "already exist, skipping... (use --force to reprocess pdf input2 files)")
return
# fixing XML files
xml_beg = ['<?xml version="1.0" ?>\n<TEI xmlns="http://www.tei-c.org/ns/1.0" xmlns:xlink="http://www.w3.org/1999/xlink" xmlns:mml="http://www.w3.org/1998/Math/MathML">\n\t<teiHeader>\n\t\t<fileDesc xml:id="f_1"/>\n\t</teiHeader>\n\t<text>\n\t\t<front/>\n\t\t<body/>\n\t\t<back>\n\t\t\t<listBibl>\n']
xml_end = ['\t\t\t</listBibl>\n\t\t</back>\n\t</text>\n</TEI>']
for j in range(0, thousands+1):
if output is not None:
filename = os.path.join(output, os.path.splitext(input2)[0] +"_"+str(j) + '.tei.xml')
else:
filename = os.path.join(ntpath.dirname(input2), os.path.splitext(input2)[0] +"_"+ str(j) + '.tei.xml')
with open(filename) as f:
content = f.readlines()
content = ["\t\t\t\t" + bibls for bibls in content]
content = xml_beg + content + xml_end
with open(filename, 'w') as f:
for item in content:
f.write("%s" % item)
def process_txt(self, filename, txt_el, input2, output, service, generateIDs, consolidate_header, consolidate_citations, force, teiCoordinates):
the_url = 'http://'+self.config['grobid_server']
if len(self.config['grobid_port'])>0:
the_url += ":"+self.config['grobid_port']
the_url += "/api/"+service
# set the GROBID parameters
the_data = {}
if generateIDs:
the_data['generateIDs'] = '1'
if consolidate_header:
the_data['consolidateHeader'] = '1'
if consolidate_citations:
the_data['consolidateCitations'] = '1'
if teiCoordinates:
the_data['teiCoordinates'] = self.config['coordinates']
the_data['citations'] = txt_el
res, status = self.post(
url=the_url,
data=the_data,
headers={'Accept': 'text/plain'}
)
if status == 503:
time.sleep(self.config['sleep_time'])
return self.process_txt(txt_el, output)
elif status != 200:
print('Processing failed with error ' + str(status))
print('FileName ' + str(filename))
with open(filename,'a') as tei_file:
tei_file.write("<biblStruct></biblStruct>\n")
else:
# writing TEI file
try:
with open(filename,'a') as tei_file:
tei_file.write(res.text)
except OSError:
print ("Writing resulting TEI XML file %s failed" % filename)
pass
def process_batch(self, pdf_files, output, n, service, generateIDs, consolidate_header, consolidate_citations, force, teiCoordinates):
print(len(pdf_files), "PDF files to process")
#with concurrent.futures.ThreadPoolExecutor(max_workers=n) as executor:
with concurrent.futures.ProcessPoolExecutor(max_workers=n) as executor:
for pdf_file in pdf_files:
executor.submit(self.process_pdf, pdf_file, output, service, generateIDs, consolidate_header, consolidate_citations, force, teiCoordinates)
def process_pdf(self, pdf_file, output, service, generateIDs, consolidate_header, consolidate_citations, force, teiCoordinates):
# check if TEI file is already produced
# we use ntpath here to be sure it will work on Windows too
pdf_file_name = ntpath.basename(pdf_file)
if output is not None:
filename = os.path.join(output, os.path.splitext(pdf_file_name)[0] + '.tei.xml')
else:
filename = os.path.join(ntpath.dirname(pdf_file), os.path.splitext(pdf_file_name)[0] + '.tei.xml')
if not force and os.path.isfile(filename):
print(filename, "already exist, skipping... (use --force to reprocess pdf input files)")
return
print(pdf_file)
files = {
'input': (
pdf_file,
open(pdf_file, 'rb'),
'application/pdf',
{'Expires': '0'}
)
}
the_url = 'http://'+self.config['grobid_server']
if len(self.config['grobid_port'])>0:
the_url += ":"+self.config['grobid_port']
the_url += "/api/"+service
# set the GROBID parameters
the_data = {}
if generateIDs:
the_data['generateIDs'] = '1'
if consolidate_header:
the_data['consolidateHeader'] = '1'
if consolidate_citations:
the_data['consolidateCitations'] = '1'
if teiCoordinates:
the_data['teiCoordinates'] = self.config['coordinates']
res, status = self.post(
url=the_url,
files=files,
data=the_data,
headers={'Accept': 'text/plain'}
)
if status == 503:
time.sleep(self.config['sleep_time'])
return self.process_pdf(pdf_file, output)
elif status != 200:
print('Processing failed with error ' + str(status))
else:
# writing TEI file
try:
with io.open(filename,'w',encoding='utf8') as tei_file:
tei_file.write(res.text)
except OSError:
print ("Writing resulting TEI XML file %s failed" % filename)
pass
if __name__ == "__main__":
parser = argparse.ArgumentParser(description = "Client for GROBID services")
parser.add_argument("service", help="one of [processFulltextDocument, processHeaderDocument, processReferences, processCitation]")
parser.add_argument("--input", default=None, help="path to the directory containing PDF to process")
parser.add_argument("--output", default=None, help="path to the directory where to put the results (optional)")
parser.add_argument("--config", default="./config.json", help="path to the config file, default is ./config.json")
parser.add_argument("--n", default=10, help="concurrency for service usage")
parser.add_argument("--generateIDs", action='store_true', help="generate random xml:id to textual XML elements of the result files")
parser.add_argument("--consolidate_header", action='store_true', help="call GROBID with consolidation of the metadata extracted from the header")
parser.add_argument("--consolidate_citations", action='store_true', help="call GROBID with consolidation of the extracted bibliographical references")
parser.add_argument("--force", action='store_true', help="force re-processing pdf input files when tei output files already exist")
parser.add_argument("--teiCoordinates", action='store_true', help="add the original PDF coordinates (bounding boxes) to the extracted elements")
args = parser.parse_args()
input_path = args.input
config_path = args.config
output_path = args.output
n =10
if args.n is not None:
try:
n = int(args.n)
except ValueError:
print("Invalid concurrency parameter n:", n, "n = 10 will be used by default")
pass
# if output path does not exist, we create it
if output_path is not None and not os.path.isdir(output_path):
try:
print("output directory does not exist but will be created:", output_path)
os.makedirs(output_path)
except OSError:
print ("Creation of the directory %s failed" % output_path)
else:
print ("Successfully created the directory %s" % output_path)
service = args.service
generateIDs = args.generateIDs
consolidate_header = args.consolidate_header
consolidate_citations = args.consolidate_citations
force = args.force
teiCoordinates = args.teiCoordinates
client = grobid_client(config_path=config_path)
start_time = time.time()
client.process(input_path, output_path, n, service, generateIDs, consolidate_header, consolidate_citations, force, teiCoordinates)
runtime = round(time.time() - start_time, 3)
print("runtime: %s seconds " % (runtime))