-
Notifications
You must be signed in to change notification settings - Fork 0
/
htcondor_utilities.py
248 lines (198 loc) · 7.53 KB
/
htcondor_utilities.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
"""
htcondor_utilities.py
-- assistant function to connect to osg node
-- job submission
-- job status report
configuration:
.env contains ssh connection parameters
put it the values for the following keys:
- userA,sshA,keyA,passA
- userB,sshB,keyB,passB
"""
import os
import sys
from dotenv import load_dotenv
from fabric import Connection
import re
from datetime import datetime
# load server setup
load_dotenv()
def get_connection():
"""get the parameters for the connection"""
gateway = {
"host": os.getenv("sshA"),
"user": os.getenv("userA"),
"connect_kwargs": {
"key_filename": os.path.expanduser(os.getenv("keyA")),
"passphrase": os.getenv("passA"),
},
}
target_host = {
"host": os.getenv("sshB"),
"user": os.getenv("userB"),
"connect_kwargs": {
"key_filename": os.path.expanduser(os.getenv("keyB")),
"passphrase": os.getenv("passB"),
},
}
return [gateway, target_host]
def run_ssh_cmd(cmd_str):
"""run a command in ssh"""
gateway, target_host = get_connection()
# Connect to the target host via the gateway
with Connection(**target_host, gateway=Connection(**gateway)) as conn:
result = conn.run(cmd_str, hide=True)
return result
def get_file(remote_path, local_path):
"""download file from the sever"""
gateway, target_host = get_connection()
with Connection(**target_host, gateway=Connection(**gateway)) as conn:
result = conn.get(remote_path, local_path)
return result
def extract_numbers_from_line(line):
"""find all the numbers from line"""
numbers = re.findall(r"\d+", line)
return list(map(int, numbers))
def htcondor_status():
"""query the htcondor status
-- Schedd: ospool-eht.chtc.wisc.edu : <128.105.68.10:9618?... @ 02/27/24 10:03:25
OWNER BATCH_NAME SUBMITTED DONE RUN IDLE HOLD TOTAL JOB_IDS
ehtbot ID: 70 2/27 09:31 184 112 _ 4 300 70.0-272
Total for query: 116 jobs; 0 completed, 0 removed, 0 idle, 112 running, 4 held, 0 suspended
Total for ehtbot: 116 jobs; 0 completed, 0 removed, 0 idle, 112 running, 4 held, 0 suspended
Total for all users: 116 jobs; 0 completed, 0 removed, 0 idle, 112 running, 4 held, 0 suspended
"""
condor_q = run_ssh_cmd("condor_q")
condor_q = condor_q.stdout
# #condor_q = """-- Schedd: ospool-eht.chtc.wisc.edu : <128.105.68.10:9618?... @ 02/27/24 10:03:25
# OWNER BATCH_NAME SUBMITTED DONE RUN IDLE HOLD TOTAL JOB_IDS
# ehtbot ID: 70 2/27 09:31 184 112 _ 4 300 70.0-272
# Total for query: 116 jobs; 0 completed, 0 removed, 0 idle, 112 running, 4 held, 0 suspended
# Total for ehtbot: 116 jobs; 0 completed, 0 removed, 0 idle, 112 running, 4 held, 0 suspended
# Total for all users: 116 jobs; 0 completed, 0 removed, 0 idle, 112 running, 4 held, 0 suspended
# """
#print(condor_q)
ehtbot_status = [x for x in condor_q.split("\n") if x.strip().startswith('ehtbot')]
if len(ehtbot_status) == 0:
return None
ehtbot_status = ehtbot_status[0]
raw = ehtbot_status.replace("_","0").split()
# ['ehtbot', 'ID:', '70', '2/27', '09:31', '184', '112', '_', '4', '300', '70.0-272']
jobid = raw[2]
jobtime = raw[3]+" " + raw[4]
jobdone = int(raw[5])
jobrun = int(raw[6])
jobidle = int(raw[7])
jobhold = int(raw[8])
jobtotal = int(raw[9])
status = {}
status['ID'] = jobid
status['SUBMITTED'] = jobtime
status['DONE'] = jobdone
status['RUN'] = jobrun
status['IDLE'] = jobidle
status['HOLD'] = jobhold
status['TOTAL'] = jobtotal
return status
# this is for the remaining jobs
# status_line = [x for x in condor_q.stdout.split("\n") if "Total for query" in x]
# if status_line == []:
# print("job status is not available, try it later.")
# return
# # "Total for query: 50 jobs; 40 completed, 2 removed, 3 idle, 2 running, 2 held, 1 suspended"
# status_line = status_line[0]
# numbers = extract_numbers_from_line(status_line)
# keys = ["jobs", "completed", "removed", "idle", "running", "held", "suspended"]
# status = dict(zip(keys, numbers))
#return status
def check_htcondor_status():
"""visualize htcondor_status"""
# get status
status = htcondor_status()
if status == None:
print("no job is running!")
return
print(f"JobID: {status['ID']}, Total: {status['TOTAL']}, Submit Time: {status['SUBMITTED']}")
now = datetime.now()
current_time = now.strftime("%m/%d %H:%M")
progress = status['DONE']/status['TOTAL']*100.0
print(f"status report {progress:.2f}% done @ {current_time}")
print(f"Done - {status['DONE']}: ","*"*status['DONE'])
print(f"Run - {status['RUN']}: ", "*"*status['RUN'])
print(f"Idle - {status['IDLE']}: ", "*"*status['IDLE'])
print(f"Hold - {status['HOLD']}: ", "*"*status['HOLD'])
def check_output():
"""check the output file"""
outputdir = os.getenv("output")
cmd = f"ls {outputdir}"
response = run_ssh_cmd(cmd)
h5list = response.stdout.split("\n")
relist = [x for x in h5list if ".h5" in x]
#print(relist)
return relist
def get_output(outputfile, localpath="."):
"""download output file"""
outputdir = os.getenv("output")
remotefile = os.path.join(outputdir, outputfile)
localfile = os.path.join(localpath,outputfile)
response = get_file(remotefile,localfile)
print(response.local)
print("File transfer successful!")
def check_errorlog():
"""check the error log"""
logdir = os.getenv("log")
cmd = f"ls {logdir}"
response = run_ssh_cmd(cmd)
errorlist = response.stdout.split("\n")
relist = [x for x in errorlist if ".err" in x]
return relist
def job_submission(paras,dryrun=False):
"""submit a job with paras"""
from jobsubmission.eht_submit import alljobs_dict
all_jobs =alljobs_dict(paras)
if dryrun:
print("total jobs: ", len(all_jobs))
print("ready to submit")
return
script = os.getenv("script")
ehtdata = paras['ehtdata']
rr = paras['rr']
tva = paras['tva']
rdm = paras['rdm']
submitcmd = f"{script} {ehtdata} {rr} {tva} {rdm}"
output = run_ssh_cmd(submitcmd)
print(output.stdout)
return
def run_testjob(submit=False):
"""a test case"""
job_paras = {}
job_paras["ehtdata"] = "torus.out0.05992.h5,torus.out0.05993.h5,torus.out0.05994.h5,torus.out0.05995.h5,torus.out0.05996.h5"
job_paras["rr"] = "10:100:10"
job_paras["tva"] = "30,50,60,70,80,90"
job_paras["rdm"] = "4.266338570441294e+17"
print("A test case with the following parameters:")
print("Ehtdata: ", job_paras["ehtdata"])
print("Rhigh-Ratio: ", job_paras["rr"])
print("Theta-Viewing-Angle: ", job_paras["tva"])
print("Rho0-Density-Normalization: ", job_paras["rdm"])
dryrun = True
if submit:
dryrun = False
job_submission(job_paras, dryrun=dryrun)
def main():
"""test the routines"""
output = run_ssh_cmd("hostname")
print("stdout:", output.stdout)
print("stderr:", output.stderr)
status = htcondor_status()
print(status)
run_testjob(submit=False)
check_htcondor_status()
h5list = check_output()
print(f"Total output: {len(h5list)}")
# get the first output
get_output(h5list[0])
#errorlist = check_errorlog()
#print(len(errorlist))
if __name__ == "__main__":
sys.exit(main())