From b5a02fc102f71cb771e091bcbdfb995d99c61282 Mon Sep 17 00:00:00 2001 From: Craig Warren Date: Thu, 16 Mar 2017 12:36:26 +0000 Subject: [PATCH] Updated handling of job array simulations and ability to restart a simulation from a specific model number. --- gprMax/gprMax.py | 105 ++++++++++++++++++++++++----------------------- 1 file changed, 53 insertions(+), 52 deletions(-) diff --git a/gprMax/gprMax.py b/gprMax/gprMax.py index c8edb18d..294ab7b9 100644 --- a/gprMax/gprMax.py +++ b/gprMax/gprMax.py @@ -38,14 +38,15 @@ def main(): """This is the main function for gprMax.""" # Print gprMax logo, version, and licencing/copyright information - logo(__version__ + ' (Bowmore)') + logo(__version__ + ' (Big Smoke)') # Parse command line arguments parser = argparse.ArgumentParser(prog='gprMax', formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('inputfile', help='path to, and name of inputfile or file object') parser.add_argument('-n', default=1, type=int, help='number of times to run the input file, e.g. to create a B-scan') + parser.add_argument('-task', type=int, help='task identifier (model number) for job array on Open Grid Scheduler/Grid Engine (http://gridscheduler.sourceforge.net/index.html)') + parser.add_argument('-restart', type=int, help='model number to restart from, e.g. when creating B-scan') parser.add_argument('-mpi', action='store_true', default=False, help='flag to switch on MPI task farm') - parser.add_argument('-task', type=int, help='task identifier for job array on Open Grid Scheduler/Grid Engine (http://gridscheduler.sourceforge.net/index.html)') parser.add_argument('-benchmark', action='store_true', default=False, help='flag to switch on benchmarking mode') parser.add_argument('--geometry-only', action='store_true', default=False, help='flag to only build model and produce geometry file(s)') parser.add_argument('--geometry-fixed', action='store_true', default=False, help='flag to not reprocess model geometry, e.g. for B-scans where the geometry is fixed') @@ -56,7 +57,7 @@ def main(): run_main(args) -def api(inputfile, n=1, mpi=False, task=False, benchmark=False, geometry_only=False, geometry_fixed=False, write_processed=False, opt_taguchi=False): +def api(inputfile, n=1, task=None, restart=None, mpi=False, benchmark=False, geometry_only=False, geometry_fixed=False, write_processed=False, opt_taguchi=False): """If installed as a module this is the entry point.""" # Print gprMax logo, version, and licencing/copyright information @@ -69,8 +70,9 @@ def api(inputfile, n=1, mpi=False, task=False, benchmark=False, geometry_only=Fa args.inputfile = inputfile args.n = n - args.mpi = mpi args.task = task + args.restart = restart + args.mpi = mpi args.benchmark = benchmark args.geometry_only = geometry_only args.geometry_fixed = geometry_fixed @@ -86,7 +88,7 @@ def run_main(args): Args: args (dict): Namespace with input arguments from command line or api. """ - + with open_path_file(args.inputfile) as inputfile: # Get information about host machine @@ -120,14 +122,14 @@ def run_main(args): if args.mpi: if args.n == 1: raise GeneralError('MPI is not beneficial when there is only one model to run') + if args.task: + raise GeneralError('MPI cannot be combined with job array mode') run_mpi_sim(args, inputfile, usernamespace) - - # Standard behaviour - part of a job array on Open Grid Scheduler/Grid Engine with each model parallelised with OpenMP (CPU) or CUDA (GPU) - elif args.task: - run_job_array_sim(args, inputfile, usernamespace) - + # Standard behaviour - models run serially with each model parallelised with OpenMP (CPU) or CUDA (GPU) else: + if args.task and args.restart: + raise GeneralError('Job array and restart modes cannot be used together') run_std_sim(args, inputfile, usernamespace) @@ -141,9 +143,21 @@ def run_std_sim(args, inputfile, usernamespace, optparams=None): optparams (dict): Optional argument. For Taguchi optimisation it provides the parameters to optimise and their values. """ + # Set range for number of models to run + if args.task: + # Job array feeds args.n number of single tasks + modelstart = args.task + modelend = args.task + 1 + elif args.restart: + modelstart = args.restart + modelend = modelstart + args.n + else: + modelstart = 1 + modelend = modelstart + args.n numbermodelruns = args.n + tsimstart = perf_counter() - for currentmodelrun in range(1, numbermodelruns + 1): + for currentmodelrun in range(modelstart, modelend): if optparams: # If Taguchi optimistaion, add specific value for each parameter to optimise for each experiment to user accessible namespace tmp = {} tmp.update((key, value[currentmodelrun - 1]) for key, value in optparams.items()) @@ -151,34 +165,7 @@ def run_std_sim(args, inputfile, usernamespace, optparams=None): modelusernamespace.update({'optparams': tmp}) else: modelusernamespace = usernamespace - run_model(args, currentmodelrun, numbermodelruns, inputfile, modelusernamespace) - tsimend = perf_counter() - simcompletestr = '\n=== Simulation completed in [HH:MM:SS]: {}'.format(datetime.timedelta(seconds=tsimend - tsimstart)) - print('{} {}\n'.format(simcompletestr, '=' * (get_terminal_width() - 1 - len(simcompletestr)))) - - -def run_job_array_sim(args, inputfile, usernamespace, optparams=None): - """Run standard simulation as part of a job array on Open Grid Scheduler/Grid Engine (http://gridscheduler.sourceforge.net/index.html) - each model is parallelised with OpenMP - - Args: - args (dict): Namespace with command line arguments - inputfile (object): File object for the input file. - usernamespace (dict): Namespace that can be accessed by user in any Python code blocks in input file. - optparams (dict): Optional argument. For Taguchi optimisation it provides the parameters to optimise and their values. - """ - - numbermodelruns = args.n - currentmodelrun = args.task - - tsimstart = perf_counter() - if optparams: # If Taguchi optimistaion, add specific value for each parameter to optimise for each experiment to user accessible namespace - tmp = {} - tmp.update((key, value[currentmodelrun - 1]) for key, value in optparams.items()) - modelusernamespace = usernamespace.copy() - modelusernamespace.update({'optparams': tmp}) - else: - modelusernamespace = usernamespace - run_model(args, currentmodelrun, numbermodelruns, inputfile, modelusernamespace) + run_model(args, currentmodelrun, modelend - 1, numbermodelruns, inputfile, modelusernamespace) tsimend = perf_counter() simcompletestr = '\n=== Simulation completed in [HH:MM:SS]: {}'.format(datetime.timedelta(seconds=tsimend - tsimstart)) print('{} {}\n'.format(simcompletestr, '=' * (get_terminal_width() - 1 - len(simcompletestr)))) @@ -216,13 +203,24 @@ def run_benchmark_sim(args, inputfile, usernamespace): cputhreads = np.append(cputhreads, int(maxthreads)) cputhreads = cputhreads[::-1] cputimes = np.zeros(len(cputhreads)) - numbermodelruns = len(cputhreads) + numbermodelruns = len(cputhreads) + modelend = numbermodelruns + 1 + usernamespace['number_model_runs'] = numbermodelruns - for currentmodelrun in range(1, numbermodelruns + 1): - os.environ['OMP_NUM_THREADS'] = str(cputhreads[currentmodelrun - 1]) - cputimes[currentmodelrun - 1] = run_model(args, currentmodelrun, numbermodelruns, inputfile, usernamespace) + for currentmodelrun in range(1, modelend): + # Set args.gpu if doing GPU benchmark + if currentmodelrun > len(cputhreads): + if isinstance(gpus, list): + args.gpu = gpus[(currentmodelrun - 1) - len(cputhreads)] + else: + args.gpu = gpus + del os.environ['OMP_NUM_THREADS'] + gputimes[(currentmodelrun - 1) - len(cputhreads)] = run_model(args, currentmodelrun, modelend - 1, numbermodelruns, inputfile, usernamespace) + else: + os.environ['OMP_NUM_THREADS'] = str(cputhreads[currentmodelrun - 1]) + cputimes[currentmodelrun - 1] = run_model(args, currentmodelrun, modelend - 1, numbermodelruns, inputfile, usernamespace) # Get model size (in cells) and number of iterations if currentmodelrun == 1: @@ -235,7 +233,7 @@ def run_benchmark_sim(args, inputfile, usernamespace): numcells = f.attrs['nx, ny, nz'] # Save number of threads and benchmarking times to NumPy archive - np.savez(os.path.splitext(inputfile.name)[0], machineID=machineIDlong, gpuIDs=[], cputhreads=cputhreads, cputimes=cputimes, iterations=iterations, numcells=numcells, version=__version__) + np.savez(os.path.splitext(inputfile.name)[0], machineID=machineIDlong, gpuIDs=[], cputhreads=cputhreads, cputimes=cputimes, gputimes=gputimes, iterations=iterations, numcells=numcells, version=__version__) simcompletestr = '\n=== Simulation completed' print('{} {}\n'.format(simcompletestr, '=' * (get_terminal_width() - 1 - len(simcompletestr)))) @@ -263,12 +261,17 @@ def run_mpi_sim(args, inputfile, usernamespace, optparams=None): status = MPI.Status() # get MPI status object name = MPI.Get_processor_name() # get name of processor/host + # Set range for number of models to run + modelstart = args.restart if args.restart else 1 + modelend = modelstart + args.n numbermodelruns = args.n + tsimstart = perf_counter() # Master process if rank == 0: - currentmodelrun = 1 + # Set current model run number (can use -task argument to start numbering from something other than 1) + currentmodelrun = modelstart numworkers = size - 1 closedworkers = 0 print('MPI master rank {} (PID {}) on {} using {} workers'.format(rank, os.getpid(), name, numworkers)) @@ -279,7 +282,7 @@ def run_mpi_sim(args, inputfile, usernamespace, optparams=None): # Worker is ready, so send it a task if tag == tags.READY.value: - if currentmodelrun < numbermodelruns + 1: + if currentmodelrun < modelend: comm.send(currentmodelrun, dest=source, tag=tags.START.value) currentmodelrun += 1 else: @@ -296,19 +299,17 @@ def run_mpi_sim(args, inputfile, usernamespace, optparams=None): # Worker process else: - while True: # Break out of loop when work receives exit message + while True: # Break out of loop when work receives exit message comm.send(None, dest=0, tag=tags.READY.value) currentmodelrun = comm.recv(source=0, tag=MPI.ANY_TAG, status=status) #  Receive a model number to run from the master tag = status.Get_tag() # Run a model if tag == tags.START.value: - - # Get info and setup device ID for GPU(s) + gpuinfo = '' - print('MPI worker rank {} (PID {}) starting model {}/{}{} on {}'.format(rank, os.getpid(), currentmodelrun, numbermodelruns, gpuinfo, name)) - + # If Taguchi optimistaion, add specific value for each parameter to optimise for each experiment to user accessible namespace if optparams: tmp = {} @@ -319,7 +320,7 @@ def run_mpi_sim(args, inputfile, usernamespace, optparams=None): modelusernamespace = usernamespace # Run the model - run_model(args, currentmodelrun, numbermodelruns, inputfile, modelusernamespace) + run_model(args, currentmodelrun, modelend - 1, numbermodelruns, inputfile, modelusernamespace) comm.send(None, dest=0, tag=tags.DONE.value) elif tag == tags.EXIT.value: