你已经派生过 gprMax
镜像自地址
https://gitee.com/sunhf/gprMax.git
已同步 2025-08-06 20:46:52 +08:00
Updated handling of job array simulations and ability to restart a simulation from a specific model number.
这个提交包含在:
105
gprMax/gprMax.py
105
gprMax/gprMax.py
@@ -38,14 +38,15 @@ def main():
|
||||
"""This is the main function for gprMax."""
|
||||
|
||||
# Print gprMax logo, version, and licencing/copyright information
|
||||
logo(__version__ + ' (Bowmore)')
|
||||
logo(__version__ + ' (Big Smoke)')
|
||||
|
||||
# Parse command line arguments
|
||||
parser = argparse.ArgumentParser(prog='gprMax', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||
parser.add_argument('inputfile', help='path to, and name of inputfile or file object')
|
||||
parser.add_argument('-n', default=1, type=int, help='number of times to run the input file, e.g. to create a B-scan')
|
||||
parser.add_argument('-task', type=int, help='task identifier (model number) for job array on Open Grid Scheduler/Grid Engine (http://gridscheduler.sourceforge.net/index.html)')
|
||||
parser.add_argument('-restart', type=int, help='model number to restart from, e.g. when creating B-scan')
|
||||
parser.add_argument('-mpi', action='store_true', default=False, help='flag to switch on MPI task farm')
|
||||
parser.add_argument('-task', type=int, help='task identifier for job array on Open Grid Scheduler/Grid Engine (http://gridscheduler.sourceforge.net/index.html)')
|
||||
parser.add_argument('-benchmark', action='store_true', default=False, help='flag to switch on benchmarking mode')
|
||||
parser.add_argument('--geometry-only', action='store_true', default=False, help='flag to only build model and produce geometry file(s)')
|
||||
parser.add_argument('--geometry-fixed', action='store_true', default=False, help='flag to not reprocess model geometry, e.g. for B-scans where the geometry is fixed')
|
||||
@@ -56,7 +57,7 @@ def main():
|
||||
run_main(args)
|
||||
|
||||
|
||||
def api(inputfile, n=1, mpi=False, task=False, benchmark=False, geometry_only=False, geometry_fixed=False, write_processed=False, opt_taguchi=False):
|
||||
def api(inputfile, n=1, task=None, restart=None, mpi=False, benchmark=False, geometry_only=False, geometry_fixed=False, write_processed=False, opt_taguchi=False):
|
||||
"""If installed as a module this is the entry point."""
|
||||
|
||||
# Print gprMax logo, version, and licencing/copyright information
|
||||
@@ -69,8 +70,9 @@ def api(inputfile, n=1, mpi=False, task=False, benchmark=False, geometry_only=Fa
|
||||
|
||||
args.inputfile = inputfile
|
||||
args.n = n
|
||||
args.mpi = mpi
|
||||
args.task = task
|
||||
args.restart = restart
|
||||
args.mpi = mpi
|
||||
args.benchmark = benchmark
|
||||
args.geometry_only = geometry_only
|
||||
args.geometry_fixed = geometry_fixed
|
||||
@@ -86,7 +88,7 @@ def run_main(args):
|
||||
Args:
|
||||
args (dict): Namespace with input arguments from command line or api.
|
||||
"""
|
||||
|
||||
|
||||
with open_path_file(args.inputfile) as inputfile:
|
||||
|
||||
# Get information about host machine
|
||||
@@ -120,14 +122,14 @@ def run_main(args):
|
||||
if args.mpi:
|
||||
if args.n == 1:
|
||||
raise GeneralError('MPI is not beneficial when there is only one model to run')
|
||||
if args.task:
|
||||
raise GeneralError('MPI cannot be combined with job array mode')
|
||||
run_mpi_sim(args, inputfile, usernamespace)
|
||||
|
||||
# Standard behaviour - part of a job array on Open Grid Scheduler/Grid Engine with each model parallelised with OpenMP (CPU) or CUDA (GPU)
|
||||
elif args.task:
|
||||
run_job_array_sim(args, inputfile, usernamespace)
|
||||
|
||||
|
||||
# Standard behaviour - models run serially with each model parallelised with OpenMP (CPU) or CUDA (GPU)
|
||||
else:
|
||||
if args.task and args.restart:
|
||||
raise GeneralError('Job array and restart modes cannot be used together')
|
||||
run_std_sim(args, inputfile, usernamespace)
|
||||
|
||||
|
||||
@@ -141,9 +143,21 @@ def run_std_sim(args, inputfile, usernamespace, optparams=None):
|
||||
optparams (dict): Optional argument. For Taguchi optimisation it provides the parameters to optimise and their values.
|
||||
"""
|
||||
|
||||
# Set range for number of models to run
|
||||
if args.task:
|
||||
# Job array feeds args.n number of single tasks
|
||||
modelstart = args.task
|
||||
modelend = args.task + 1
|
||||
elif args.restart:
|
||||
modelstart = args.restart
|
||||
modelend = modelstart + args.n
|
||||
else:
|
||||
modelstart = 1
|
||||
modelend = modelstart + args.n
|
||||
numbermodelruns = args.n
|
||||
|
||||
tsimstart = perf_counter()
|
||||
for currentmodelrun in range(1, numbermodelruns + 1):
|
||||
for currentmodelrun in range(modelstart, modelend):
|
||||
if optparams: # If Taguchi optimistaion, add specific value for each parameter to optimise for each experiment to user accessible namespace
|
||||
tmp = {}
|
||||
tmp.update((key, value[currentmodelrun - 1]) for key, value in optparams.items())
|
||||
@@ -151,34 +165,7 @@ def run_std_sim(args, inputfile, usernamespace, optparams=None):
|
||||
modelusernamespace.update({'optparams': tmp})
|
||||
else:
|
||||
modelusernamespace = usernamespace
|
||||
run_model(args, currentmodelrun, numbermodelruns, inputfile, modelusernamespace)
|
||||
tsimend = perf_counter()
|
||||
simcompletestr = '\n=== Simulation completed in [HH:MM:SS]: {}'.format(datetime.timedelta(seconds=tsimend - tsimstart))
|
||||
print('{} {}\n'.format(simcompletestr, '=' * (get_terminal_width() - 1 - len(simcompletestr))))
|
||||
|
||||
|
||||
def run_job_array_sim(args, inputfile, usernamespace, optparams=None):
|
||||
"""Run standard simulation as part of a job array on Open Grid Scheduler/Grid Engine (http://gridscheduler.sourceforge.net/index.html) - each model is parallelised with OpenMP
|
||||
|
||||
Args:
|
||||
args (dict): Namespace with command line arguments
|
||||
inputfile (object): File object for the input file.
|
||||
usernamespace (dict): Namespace that can be accessed by user in any Python code blocks in input file.
|
||||
optparams (dict): Optional argument. For Taguchi optimisation it provides the parameters to optimise and their values.
|
||||
"""
|
||||
|
||||
numbermodelruns = args.n
|
||||
currentmodelrun = args.task
|
||||
|
||||
tsimstart = perf_counter()
|
||||
if optparams: # If Taguchi optimistaion, add specific value for each parameter to optimise for each experiment to user accessible namespace
|
||||
tmp = {}
|
||||
tmp.update((key, value[currentmodelrun - 1]) for key, value in optparams.items())
|
||||
modelusernamespace = usernamespace.copy()
|
||||
modelusernamespace.update({'optparams': tmp})
|
||||
else:
|
||||
modelusernamespace = usernamespace
|
||||
run_model(args, currentmodelrun, numbermodelruns, inputfile, modelusernamespace)
|
||||
run_model(args, currentmodelrun, modelend - 1, numbermodelruns, inputfile, modelusernamespace)
|
||||
tsimend = perf_counter()
|
||||
simcompletestr = '\n=== Simulation completed in [HH:MM:SS]: {}'.format(datetime.timedelta(seconds=tsimend - tsimstart))
|
||||
print('{} {}\n'.format(simcompletestr, '=' * (get_terminal_width() - 1 - len(simcompletestr))))
|
||||
@@ -216,13 +203,24 @@ def run_benchmark_sim(args, inputfile, usernamespace):
|
||||
cputhreads = np.append(cputhreads, int(maxthreads))
|
||||
cputhreads = cputhreads[::-1]
|
||||
cputimes = np.zeros(len(cputhreads))
|
||||
numbermodelruns = len(cputhreads)
|
||||
|
||||
numbermodelruns = len(cputhreads)
|
||||
modelend = numbermodelruns + 1
|
||||
|
||||
usernamespace['number_model_runs'] = numbermodelruns
|
||||
|
||||
for currentmodelrun in range(1, numbermodelruns + 1):
|
||||
os.environ['OMP_NUM_THREADS'] = str(cputhreads[currentmodelrun - 1])
|
||||
cputimes[currentmodelrun - 1] = run_model(args, currentmodelrun, numbermodelruns, inputfile, usernamespace)
|
||||
for currentmodelrun in range(1, modelend):
|
||||
# Set args.gpu if doing GPU benchmark
|
||||
if currentmodelrun > len(cputhreads):
|
||||
if isinstance(gpus, list):
|
||||
args.gpu = gpus[(currentmodelrun - 1) - len(cputhreads)]
|
||||
else:
|
||||
args.gpu = gpus
|
||||
del os.environ['OMP_NUM_THREADS']
|
||||
gputimes[(currentmodelrun - 1) - len(cputhreads)] = run_model(args, currentmodelrun, modelend - 1, numbermodelruns, inputfile, usernamespace)
|
||||
else:
|
||||
os.environ['OMP_NUM_THREADS'] = str(cputhreads[currentmodelrun - 1])
|
||||
cputimes[currentmodelrun - 1] = run_model(args, currentmodelrun, modelend - 1, numbermodelruns, inputfile, usernamespace)
|
||||
|
||||
# Get model size (in cells) and number of iterations
|
||||
if currentmodelrun == 1:
|
||||
@@ -235,7 +233,7 @@ def run_benchmark_sim(args, inputfile, usernamespace):
|
||||
numcells = f.attrs['nx, ny, nz']
|
||||
|
||||
# Save number of threads and benchmarking times to NumPy archive
|
||||
np.savez(os.path.splitext(inputfile.name)[0], machineID=machineIDlong, gpuIDs=[], cputhreads=cputhreads, cputimes=cputimes, iterations=iterations, numcells=numcells, version=__version__)
|
||||
np.savez(os.path.splitext(inputfile.name)[0], machineID=machineIDlong, gpuIDs=[], cputhreads=cputhreads, cputimes=cputimes, gputimes=gputimes, iterations=iterations, numcells=numcells, version=__version__)
|
||||
|
||||
simcompletestr = '\n=== Simulation completed'
|
||||
print('{} {}\n'.format(simcompletestr, '=' * (get_terminal_width() - 1 - len(simcompletestr))))
|
||||
@@ -263,12 +261,17 @@ def run_mpi_sim(args, inputfile, usernamespace, optparams=None):
|
||||
status = MPI.Status() # get MPI status object
|
||||
name = MPI.Get_processor_name() # get name of processor/host
|
||||
|
||||
# Set range for number of models to run
|
||||
modelstart = args.restart if args.restart else 1
|
||||
modelend = modelstart + args.n
|
||||
numbermodelruns = args.n
|
||||
|
||||
tsimstart = perf_counter()
|
||||
|
||||
# Master process
|
||||
if rank == 0:
|
||||
currentmodelrun = 1
|
||||
# Set current model run number (can use -task argument to start numbering from something other than 1)
|
||||
currentmodelrun = modelstart
|
||||
numworkers = size - 1
|
||||
closedworkers = 0
|
||||
print('MPI master rank {} (PID {}) on {} using {} workers'.format(rank, os.getpid(), name, numworkers))
|
||||
@@ -279,7 +282,7 @@ def run_mpi_sim(args, inputfile, usernamespace, optparams=None):
|
||||
|
||||
# Worker is ready, so send it a task
|
||||
if tag == tags.READY.value:
|
||||
if currentmodelrun < numbermodelruns + 1:
|
||||
if currentmodelrun < modelend:
|
||||
comm.send(currentmodelrun, dest=source, tag=tags.START.value)
|
||||
currentmodelrun += 1
|
||||
else:
|
||||
@@ -296,19 +299,17 @@ def run_mpi_sim(args, inputfile, usernamespace, optparams=None):
|
||||
|
||||
# Worker process
|
||||
else:
|
||||
while True: # Break out of loop when work receives exit message
|
||||
while True: # Break out of loop when work receives exit message
|
||||
comm.send(None, dest=0, tag=tags.READY.value)
|
||||
currentmodelrun = comm.recv(source=0, tag=MPI.ANY_TAG, status=status) # Receive a model number to run from the master
|
||||
tag = status.Get_tag()
|
||||
|
||||
# Run a model
|
||||
if tag == tags.START.value:
|
||||
|
||||
# Get info and setup device ID for GPU(s)
|
||||
|
||||
gpuinfo = ''
|
||||
|
||||
print('MPI worker rank {} (PID {}) starting model {}/{}{} on {}'.format(rank, os.getpid(), currentmodelrun, numbermodelruns, gpuinfo, name))
|
||||
|
||||
|
||||
# If Taguchi optimistaion, add specific value for each parameter to optimise for each experiment to user accessible namespace
|
||||
if optparams:
|
||||
tmp = {}
|
||||
@@ -319,7 +320,7 @@ def run_mpi_sim(args, inputfile, usernamespace, optparams=None):
|
||||
modelusernamespace = usernamespace
|
||||
|
||||
# Run the model
|
||||
run_model(args, currentmodelrun, numbermodelruns, inputfile, modelusernamespace)
|
||||
run_model(args, currentmodelrun, modelend - 1, numbermodelruns, inputfile, modelusernamespace)
|
||||
comm.send(None, dest=0, tag=tags.DONE.value)
|
||||
|
||||
elif tag == tags.EXIT.value:
|
||||
|
在新工单中引用
屏蔽一个用户