-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlaunch_mc.py
More file actions
executable file
·343 lines (278 loc) · 12.2 KB
/
launch_mc.py
File metadata and controls
executable file
·343 lines (278 loc) · 12.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
#!/usr/bin/env python
# Paul Mattione
# Built off of hdswif by Kei Moriya
#
# SWIF DOCUMENTATION:
# https://scicomp.jlab.org/docs/swif
# https://scicomp.jlab.org/docs/swif-cli
from optparse import OptionParser
import os.path
import rcdb
import sys
import time
import glob
import re
import time
from subprocess import Popen, PIPE
VERBOSE = True
####################################################### TRY COMMAND ######################################################
def try_command(command, sleeptime = 5):
# Try an os command and if the exit code is non-zero then return an error
return_code = -999
while return_code != 0:
process = Popen(command.split(), stdout=PIPE)
output = process.communicate()[0] # is stdout. [1] is stderr
print output
return_code = process.returncode
if return_code == 0:
break #successful: leave
# sleep for a few seconds between tries
print 'sleeping for ' + str(sleeptime) + ' sec...'
time.sleep(sleeptime)
####################################################### READ CONFIG ######################################################
def read_config(CONFIG_FILENAME):
# Read in user config file
config_dict = {}
infile_config = open(CONFIG_FILENAME, 'r')
for line in infile_config:
# Ignore empty lines, and lines that begin with #
if (len(line.split()) == 0) or (line.split()[0][0] == '#'):
continue
# Add new key/value pair into config_dict
key = str(line.split()[0])
value = line.split()[1]
# if is string in quotes (e.g. RCDB query) continue reading until end-quote
read_index = 2
while(value[0] == "\"") and (value[-1:] != "\""):
value += " " + line.split()[read_index]
read_index += 1
# now, strip leading and tail quotation marks (if present)
if read_index > 2:
value = value[1:-1]
config_dict[key] = value
if(VERBOSE == True):
print "Job Config key, value = " + key + " " + value
# Some of the keys may depend on other configuration parameters, so update the values
# containing [key] within the values corresponding to those keys.
#
# Example:
# OUTPUT_TOPDIR /volatile/halld/test/RunPeriod-[RUNPERIOD]/ver[VERSION]
# depends on other config parameters RUNPERIOD and VERSION
#
# NOTE: The method assumes there are no circular dependencies
# Iterate over key/value pairs in dictionary. If we find a replacement, we need to start over.
# The parameter found keeps track of whether we found a replacement or not.
found = 1
while(found):
for key, value in config_dict.items():
found = 0
# For each one see if any values contain [P] where P is a different value
for other_key, other_value in config_dict.items():
if str(value).find(str('[' + other_key + ']')) != -1:
# Found replacement
found = 1
new_value = value.replace(str('[' + other_key + ']'),other_value)
# Replace new key/value pair into config_dict
new_pair = {key : new_value}
config_dict.update(new_pair)
del new_pair
# Break out of loop over other_key, other_value
break
# Break out of loop over key, value, to restart loop again
if found == 1:
break
# If we do not find a replacement we will finish the loop
return config_dict
##################################################### VALIDATE CONFIG ####################################################
def validate_config(config_dict):
# JOB ACCOUNTING
if("PROJECT" not in config_dict) or ("TRACK" not in config_dict) or ("OS" not in config_dict):
print "ERROR: JOB ACCOUNTING NOT FULLY SPECIFIED IN CONFIG FILE. ABORTING"
sys.exit(1)
# JOB RESOURCES
if("NCORES" not in config_dict) or ("DISK" not in config_dict) or ("RAM" not in config_dict) or ("TIMELIMIT" not in config_dict):
print "ERROR: JOB RESOURCES NOT FULLY SPECIFIED IN CONFIG FILE. ABORTING"
sys.exit(1)
# WORKFLOW DEFINITION
if("WORKFLOW" not in config_dict):
print "ERROR: WORKFLOW DEFINITION NOT FULLY SPECIFIED IN CONFIG FILE. ABORTING"
sys.exit(1)
# JOB, SCRIPT CONTROL
if("ENVFILE" not in config_dict) or ("SCRIPTFILE" not in config_dict):
print "ERROR: JOB, SCRIPT CONTROL NOT FULLY SPECIFIED IN CONFIG FILE. ABORTING"
sys.exit(1)
# FILE INPUT, OUTPUT BASE DIRECTORIES
if("INDATA_TOPDIR" not in config_dict) or ("OUTDIR_LARGE" not in config_dict) or ("OUTDIR_SMALL" not in config_dict):
print "ERROR: FILE INPUT, OUTPUT BASE DIRECTORIES NOT FULLY SPECIFIED IN CONFIG FILE. ABORTING"
sys.exit(1)
####################################################### FIND FILES #######################################################
def find_files(INDATA_DIR, FORMATTED_RUN, FORMATTED_FILE):
# Note: This won't work if the file names don't contain ".", or if there are directories that DO contain "."
# If need specific file #
if(FORMATTED_FILE != "*"):
pathstring = INDATA_DIR + '/*' + FORMATTED_RUN + '*_*' + FORMATTED_FILE + '*.*'
#pathstring = INDATA_DIR + '/dana_rest_bggen' + FORMATTED_RUN + '*_*' + FORMATTED_FILE + '*.*'
return glob.glob(pathstring)
# Else just require run # in name
pathstring = INDATA_DIR + '/dana_rest_bggen_' + FORMATTED_RUN + '*.*'
return glob.glob(pathstring)
######################################################## ADD JOB #########################################################
def find_num_threads(JANA_CONFIG_FILENAME):
num_threads = "1"
# Read in JANA config file
infile_config = open(JANA_CONFIG_FILENAME, 'r')
for line in infile_config:
# Ignore empty lines, and lines that begin with #
if (len(line.split()) == 0) or (line.split()[0][0] == '#'):
continue
# Save #-threads if correct key
key = str(line.split()[0])
if (key != "NTHREADS"):
continue
num_threads = line.split()[1]
break;
return num_threads
def add_job(WORKFLOW, FILEPATH, config_dict):
# EXTRACT PATH, RUNNO, & FILE #: ASSUME THE FORM IS EITHER */*_RUNNO_FILENO.* OR */*_RUNNO.*
match = re.search(r"(.*)/(.*)_(\d\d\d\d\d\d)_(\d\d\d).(.*)", FILEPATH)
if(match is not None):
INDATA_DIR = match.group(1)
PREFIX = match.group(2)
RUNNO = match.group(3)
FILENO = match.group(4)
EXTENSION = match.group(5)
else: # Try with no file #
match = re.search(r"(.*)/(.*)_(\d\d\d\d\d\d).(.*)", FILEPATH)
if(match is None):
print "WARNING: FILE " + FILEPATH + " DOESN'T MATCH EXPECTED NAME FORMAT. SKIPPING."
return
INDATA_DIR = match.group(1)
PREFIX = match.group(2)
RUNNO = match.group(3)
FILENO = "-1"
EXTENSION = match.group(4)
if(VERBOSE == True):
print "FILEPATH, COMPONENTS: " + FILEPATH + " " + INDATA_DIR + " " + PREFIX + " " + RUNNO + " " + FILENO + " " + EXTENSION
# PREPARE NAMES
DATE = time.strftime("%Y-%m-%d")
STUBNAME = RUNNO if(FILENO == "-1") else RUNNO + "_" + FILENO
FILENAME = PREFIX + "_" + RUNNO + "." + EXTENSION if(FILENO == "-1") else PREFIX + "_" + RUNNO + "_" + FILENO + "." + EXTENSION
if(WORKFLOW.find("ver") == -1):
JOBNAME = WORKFLOW + "_" + STUBNAME + "_" + DATE
else:
JOBNAME = WORKFLOW + "_" + STUBNAME
# SETUP OTHER VARIABLES:
INPUTDATA_TYPE = "mss" if(INDATA_DIR[:5] == "/mss/") else "file"
CACHE_PIN_DAYS = config_dict["CACHE_PIN_DAYS"] if ("CACHE_PIN_DAYS" in config_dict) else "0"
JANA_CONFIG = config_dict["JANA_CONFIG"] if ("JANA_CONFIG" in config_dict) else "NA"
NUM_THREADS = find_num_threads(JANA_CONFIG) if ("JANA_CONFIG" in config_dict) else "1"
# SETUP LOG DIRECTORY FOR SLURM
if(FILENO != "-1"):
LOG_DIR = config_dict["OUTDIR_SMALL"] + "/log/" + RUNNO
else:
LOG_DIR = config_dict["OUTDIR_SMALL"] + "/log"
make_log_dir = "mkdir -p " + LOG_DIR
try_command(make_log_dir)
if(VERBOSE == True):
print "LOG DIRECTORY " + LOG_DIR + " CREATED"
# CREATE ADD-JOB COMMAND
# job
add_command = "swif add-job -slurm -workflow " + WORKFLOW + " -name " + JOBNAME
# accounting
add_command += " -project " + config_dict["PROJECT"] + " -track " + config_dict["TRACK"] + " -os " + config_dict["OS"]
# resources
add_command += " -cores " + config_dict["NCORES"] + " -disk " + config_dict["DISK"] + " -ram " + config_dict["RAM"] + " -time " + config_dict["TIMELIMIT"]
# inputs
add_command += " -input " + FILENAME + " " + INPUTDATA_TYPE + ":" + INDATA_DIR + "/" + FILENAME
# stdout, stderr
if(FILENO != "-1"):
add_command += " -stdout " + config_dict["OUTDIR_SMALL"] + "/log/" + RUNNO + "/stdout." + STUBNAME + ".out"
add_command += " -stderr " + config_dict["OUTDIR_SMALL"] + "/log/" + RUNNO + "/stderr." + STUBNAME + ".err"
else:
add_command += " -stdout " + config_dict["OUTDIR_SMALL"] + "/log/stdout." + STUBNAME + ".out"
add_command += " -stderr " + config_dict["OUTDIR_SMALL"] + "/log/stderr." + STUBNAME + ".err"
# tags
add_command += " -tag run_number " + RUNNO + " -tag num_threads " + NUM_THREADS
# file # tag
if(FILENO != "-1"):
add_command += " -tag file_number " + FILENO
# command + arguments
add_command += " " + config_dict["SCRIPTFILE"] + " " + config_dict["ENVFILE"] + " " + FILENAME + " " + JANA_CONFIG
# command arguments continued
add_command += " " + config_dict["OUTDIR_LARGE"] + " " + config_dict["OUTDIR_SMALL"] + " " + RUNNO + " " + FILENO + " " + CACHE_PIN_DAYS
# optional command arguments
if('ROOT_SCRIPT' in config_dict):
add_command += " " + config_dict["ROOT_SCRIPT"]
if('TREE_NAME' in config_dict):
add_command += " " + config_dict["TREE_NAME"]
if('SELECTOR_NAME' in config_dict):
add_command += " " + config_dict["SELECTOR_NAME"]
add_command += " " + config_dict["NCORES"]
if('WEBDIR_SMALL' in config_dict):
add_command += " " + config_dict["WEBDIR_SMALL"]
if('WEBDIR_LARGE' in config_dict):
add_command += " " + config_dict["WEBDIR_LARGE"]
if(VERBOSE == True):
print "job add command is \n" + str(add_command)
# ADD JOB
status = try_command(add_command)
########################################################## MAIN ##########################################################
def main(argv):
global VERBOSE # so can modify here
# PARSER
parser_usage = "launch.py job_configfile minrun maxrun\n\n"
parser_usage += "optional: -f file_num: file_num must be 3 digits, with leading 0's if necessary)\n"
parser_usage += " but, it can be a search string for glob (e.g. first 5 files: -f '00[0-4]' (MUST include quotes!))\n\n"
parser_usage += "optional: -v True: verbose output\n\n"
parser = OptionParser(usage = parser_usage)
# PARSER OPTIONS
parser.add_option("-f", "--file", dest="file", help="specify file(s) to run over")
parser.add_option("-v", "--verbose", dest="verbose", help="verbose output")
# GET ARGUMENTS
(options, args) = parser.parse_args(argv)
if(len(args) < 3):
parser.print_help()
return
# SET INPUT VARIABLES
JOB_CONFIG_FILE = args[0]
MINRUN = int(args[1])
MAXRUN = int(args[2])
VERBOSE = True if(options.verbose) else False
INPUT_FILE_NUM = options.file if(options.file) else "*" #must be three digits, with leading 0's if necessary
# READ CONFIG
config_dict = read_config(JOB_CONFIG_FILE)
validate_config(config_dict)
# SET CONTROL VARIABLES
WORKFLOW = config_dict["WORKFLOW"]
RCDB_QUERY = config_dict["RCDB_QUERY"] if ("RCDB_QUERY" in config_dict) else ""
INDATA_TOPDIR = config_dict["INDATA_TOPDIR"] if ("INDATA_TOPDIR" in config_dict) else ""
# GET THE LIST OF GOOD RUNS
db = rcdb.RCDBProvider("mysql://rcdb@hallddb/rcdb")
good_runs = []
if(VERBOSE == True):
print "RCDB_QUERY = " + RCDB_QUERY
if(RCDB_QUERY != ""):
good_runs = db.select_runs(RCDB_QUERY, MINRUN, MAXRUN)
if(VERBOSE == True):
print str(len(good_runs)) + " good runs in range: " + str(MINRUN) + " - " + str(MAXRUN)
# FIND & ADD JOBS
for RUN in range(MINRUN, MAXRUN + 1):
# See if is good run
rcdb_run_info = db.get_run(int(RUN))
if(RCDB_QUERY != "") and (rcdb_run_info not in good_runs):
continue
# Format run number
FORMATTED_RUN = "%06d" % RUN
# Find files for run number: First try separate folder for each run
INDATA_DIR = INDATA_TOPDIR
file_list = find_files(INDATA_DIR, FORMATTED_RUN, INPUT_FILE_NUM)
if(len(file_list) == 0): # No files. Now just try the input dir
INDATA_DIR = INDATA_TOPDIR + "/"
file_list = find_files(INDATA_DIR, FORMATTED_RUN, INPUT_FILE_NUM)
if(VERBOSE == True):
print str(len(file_list)) + " files found for run " + str(RUN)
# Add jobs to workflow
for FILEPATH in file_list:
add_job(WORKFLOW, FILEPATH, config_dict)
if __name__ == "__main__":
main(sys.argv[1:])