-
Notifications
You must be signed in to change notification settings - Fork 0
/
RunManager.py
246 lines (195 loc) · 9.83 KB
/
RunManager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
import re
import os
import sys
# import shutil
import subprocess
import numpy as np
from multiprocessing import Pool
from sqlite3 import connect as sqlConnect
from PESMan import makeLogger, parseConfig
from ReadResults import parseMrciDdrNACT_Util, parseMultiEnr_Util
from ImpExp import ExportJobs, ImportJobs
# from logging.handlers import TimedRotatingFileHandler
####--------------------User specific options----------------------------#######
process = 1 # values more than 1 will invoke parallel
calcId = 1 # calculation ID
depth = 0 # maximum depth to look for
maxJobs = 10 # total number of jobs to perform
perIterJob = 20 # export this number of jobs per iteration
readResultsStep = 500 # step to read result
constraint = None # geom tag constraints
includePath = False # include geoms with `path` tags
ignoreFiles = [] # ignores the file extensions
deleteAfterImport = True # delete the files after successful import
zipAfterImport = True # archive on save on GeomData
stdOut = False # print on terminal
importOnConverge = True # only import MCSCF converged results
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
templ = None
gidList = []
sidList = []
iterFile = 'IterMultiJobs.dat' # saves the MCSCF iterations
#NOTES:
#=======================================================================
# 2. For simplicity and consistency only the molpro run part is done in parallel, all export/import are done serially,
# this will cost just few minutes in total run of all, so this is not worth complicating things.
# 3. In case of parallel execution, this script is implemented to export <=`perIterJob` number of jobs in every iteration.
# 5. When using parallel implementation of the script, its advisable to run the molpro itself in a sinlgle process,
# so the script uses 1 core to run the molpro whenever the parallel execution is opted. Modify the code below to remove that.
# 6. When running the parallel version DON'T try to stop this with SIGINT ( Ctrl+C shortcut). To kill the job use `kill` utility
# or SIGSTOP (Ctrl+\)/SIGQUIT (Ctrl+Z)/SIGHUP signals what ever applicable.
calcId = int(sys.argv[1])
process = int(sys.argv[2])
includePath = bool(int(sys.argv[3]))
maxJobs = int(sys.argv[4])
perIterJob = int(sys.argv[5])
ignoreFiles = [] if calcId==1 else ['wfu']
config = parseConfig()
dB = config['DataBase']['db']
pesDir = config['Directories']['pesdir']
expDir = config['Directories']['expdir']
# runDir = config['Directories']['rundir']
# impDir = config['Directories']['impdir']
logFile = config['Log']['logfile']
molInfo = config['molInfo']
isParallel= process >1 # use parallel implementation
# if isParallel: molInfo['proc'] = '1'
procRun = '1' if isParallel else molInfo['proc']
# create rundir and impdir if does'nt exist
# for fold in [runDir, impDir]:
# for fold in [runDir]:
# if not os.path.exists(fold):
# os.makedirs(fold)
#open logfiles
logger = makeLogger(logFile=logFile,stdout=stdOut)
if not os.path.exists(iterFile):
iterLog= open(iterFile, 'w', buffering=1)
iterLog.write('GeomId Iteration No.\n')
else:
iterLog= open(iterFile, 'a', buffering=1)
logger.info('----------------------------------------------------------')
logger.debug('''Starting PESMan RunManager
----------------------------------------------------------
Host : {}
Process ID : {}
Total Jobs : {}{}
CalcId : {}
Depth : {}
Result Step : {}
Constraint : {}
Include path : {}{}
Archive : {}
Delete on Import : {}
----------------------------------------------------------
'''.format(
os.uname()[1],os.getpid(), maxJobs,
"\n Parallel processes : {}".format(process) if process>1 else '',
calcId, depth, readResultsStep, constraint, includePath,
"\n Ignore Files : {}".format(ignoreFiles) if ignoreFiles else '',
deleteAfterImport, zipAfterImport
)
)
def parseIteration(baseName):
# calc id other than 1 is not multi jobs, so no iteration check required
if calcId !=1: return True
# If this returns `True` then the job is properly successful
outFile = baseName + '.out'
gId = re.findall(r'geom(\d+)-', outFile)[0] # parse goem id, just for note
try:
with open(outFile) as f: txt = f.read()
val = re.findall(r'\s*(\d+).*\n\n\s*\*\* WVFN \*\*\*\*', txt)[0] # parse the iteration number
# val = re.findall(r'\s*(\d+).*\n\n\s* CONVERGENCE REACHED! ', txt)[0] # from molpro 2024
val = int(val)
except:
import traceback
logger.info('Failed to parse MCSCF iteration. %s'%traceback.format_exc())
return
iterLog.write('{:>6} {:>6}\n'.format( gId, val)) # exportId is taken from global
if importOnConverge and val>38: # flag true with no convergence, skip
logger.info('Number of MCSCF iteration for {}: {} Skipping import.'.format(baseName,val))
return False
logger.info('Number of MCSCF iteration for {}: {}'.format(baseName,val))
return True
def utilityFunc(arg): # getting one single individual job directory, inside the rundir
thisRunDir,baseName = arg
os.chdir(thisRunDir+'/'+baseName)
logger.info("Running Molpro Job {} ...".format(baseName))
exitcode = subprocess.call(
[molInfo['exe'], "-d", molInfo['scrdir'], "-W .", "-n", procRun, baseName+'.com']+ molInfo['extra']
)
if exitcode==0:
logger.info("\nJob Successful for {}".format(baseName))
#NOTE: if the job crossed maximum iteration then the calc_ file wont be renamed i.e. it would be marked as failed
if parseIteration(baseName):
file = "{}.calc".format(baseName)
os.rename( file+'_', file) # rename .calc_ file so that it can be imported
else:
logger.info("\nJob Failed {} \n\n".format(baseName))
os.chdir(mainDirectory)
# return exitcode==0 # let the main loop know this job is successful
def haltedExport(dB):
with sqlConnect(dB) as con:
cur = con.cursor()
cur.execute('select id from geometry where id not in (select geomid from expcalc) and id not in (select geomid from calc)')
ntx = [i for (i,) in cur]
if len(ntx)==0:
return False
cur.execute('select geomid from calc where calcid=1')
imp = [i for (i,) in cur]
mat=np.abs(np.subtract.outer(ntx,imp))
indx,ind=np.where(mat==mat.min())
return ntx[indx[0]], imp[ind[0]]
def readResult():
try:
logger.info("\nReading results from database.")
parseMultiEnr_Util() if calcId==1 else parseMrciDdrNACT_Util()
except Exception as e:
logger.info("\nError reading results from database.\n{}".format(e))
if __name__ == "__main__":
jobCounter = 0 # keeps a counter for the done jobs
mainDirectory = os.getcwd()
if isParallel: pool = Pool(processes=process)
try:
while True:
if isParallel:
# check if `perIterJob` number of jobs can be exported, keepign total jobs exactly `maxjobs`
thisJobs = perIterJob if maxJobs-jobCounter>perIterJob else maxJobs-jobCounter
logger.debug(' Starting Job No : {}-{}\n{}'.format( jobCounter+1,jobCounter+thisJobs, '*'*75))
else:
thisJobs = 1 # not parallel means always export just one job
logger.debug(' Starting Job No : {}\n{}'.format( jobCounter+1, '*'*75))
try:
# will be exporting `perIterJob` number of jobs to run them in parallel
thisExpDir, exportId, jobDirs = ExportJobs(dB, calcId, thisJobs, process, expDir,pesDir, templ,
gidList, sidList, depth, constraint, includePath, molInfo, isParallel, logger)
except AssertionError:
if includePath and calcId==1: # only try when multi calculation and include path in on, for simplicity
gids = haltedExport(dB)
if(gids):
logger.info('{0}\nBroken neighbour list detected, attempting brute-force export\n{0}\n'.format('='*75))
thisExpDir, exportId, jobDirs = ExportJobs(dB, calcId, 1, 1, expDir,pesDir, templ,
[gids[0]], [gids[1]], depth,constraint, includePath, molInfo, isParallel, logger)
else:
raise AssertionError('No more gemetry is avilable to export.')
else:
raise
jobCounter += len(jobDirs) # kept if exactly `perIterJob` number of jobs not exported
thisRunDir = thisExpDir # job will be run in the same folder
thisImpDir = thisRunDir # job will be imported from the same folder
if isParallel:
pool.map(utilityFunc, [[thisRunDir,i] for i in jobDirs])
else:
for i in jobDirs : utilityFunc([thisRunDir,i])
logger.info('')
expFile = thisImpDir+'/export.dat'
ImportJobs(dB, process, expFile, pesDir, ignoreFiles, deleteAfterImport, zipAfterImport, logger)
if not jobCounter%readResultsStep:readResult()
if jobCounter >= maxJobs : break
if isParallel: pool.close()
readResult()
logger.info("Total number of successful jobs done : {}\n{}\n".format(jobCounter, '*'*75))
except AssertionError as e:
logger.info('PESMan RunManager Stopped. %s'%e)
except:
logger.exception('PESMan RunManager failed')