From 988478710a1d16b3ec6fe800f9977acf2d4814fe Mon Sep 17 00:00:00 2001 From: Craig Warren Date: Mon, 20 Mar 2017 16:16:12 +0000 Subject: [PATCH] Overhaul of MPI mode to spawn MPI workers from within gprMax, so doesn't need to be run with mpirun/mpiexec. --- gprMax/gprMax.py | 161 +++++++++++++++++++++++------------------------ 1 file changed, 77 insertions(+), 84 deletions(-) diff --git a/gprMax/gprMax.py b/gprMax/gprMax.py index 294ab7b9..e29e09bd 100644 --- a/gprMax/gprMax.py +++ b/gprMax/gprMax.py @@ -23,22 +23,23 @@ import datetime from enum import Enum import os from time import perf_counter +import sys import h5py import numpy as np -from gprMax._version import __version__ +from gprMax._version import __version__, codename from gprMax.constants import c, e0, m0, z0 from gprMax.exceptions import GeneralError from gprMax.model_build_run import run_model -from gprMax.utilities import get_host_info, get_terminal_width, human_size, logo, open_path_file +from gprMax.utilities import get_host_info, get_terminal_width, human_size, logo, open_path_file, detect_gpus def main(): """This is the main function for gprMax.""" # Print gprMax logo, version, and licencing/copyright information - logo(__version__ + ' (Big Smoke)') + logo(__version__ + ' (' + codename + ')') # Parse command line arguments parser = argparse.ArgumentParser(prog='gprMax', formatter_class=argparse.ArgumentDefaultsHelpFormatter) @@ -46,7 +47,8 @@ def main(): parser.add_argument('-n', default=1, type=int, help='number of times to run the input file, e.g. to create a B-scan') parser.add_argument('-task', type=int, help='task identifier (model number) for job array on Open Grid Scheduler/Grid Engine (http://gridscheduler.sourceforge.net/index.html)') parser.add_argument('-restart', type=int, help='model number to restart from, e.g. when creating B-scan') - parser.add_argument('-mpi', action='store_true', default=False, help='flag to switch on MPI task farm') + parser.add_argument('-mpi', type=int, help='number of MPI tasks, i.e. master + workers') + parser.add_argument('--mpi-worker', action='store_true', default=False, help=argparse.SUPPRESS) parser.add_argument('-benchmark', action='store_true', default=False, help='flag to switch on benchmarking mode') parser.add_argument('--geometry-only', action='store_true', default=False, help='flag to only build model and produce geometry file(s)') parser.add_argument('--geometry-fixed', action='store_true', default=False, help='flag to not reprocess model geometry, e.g. for B-scans where the geometry is fixed') @@ -61,7 +63,7 @@ def api(inputfile, n=1, task=None, restart=None, mpi=False, benchmark=False, geo """If installed as a module this is the entry point.""" # Print gprMax logo, version, and licencing/copyright information - logo(__version__ + ' (Bowmore)') + logo(__version__ + ' (' + codename + ')') class ImportArguments: pass @@ -111,8 +113,11 @@ def run_main(args): # Process for simulation with Taguchi optimisation # #################################################### elif args.opt_taguchi: - from gprMax.optimisation_taguchi import run_opt_sim - run_opt_sim(args, inputfile, usernamespace) + if args.mpi_worker: # Special case for MPI spawned workers - they do not need to enter the Taguchi optimisation mode + run_mpi_sim(args, inputfile, usernamespace) + else: + from gprMax.optimisation_taguchi import run_opt_sim + run_opt_sim(args, inputfile, usernamespace) ################################################ # Process for standard simulation (CPU or GPU) # @@ -210,17 +215,8 @@ def run_benchmark_sim(args, inputfile, usernamespace): usernamespace['number_model_runs'] = numbermodelruns for currentmodelrun in range(1, modelend): - # Set args.gpu if doing GPU benchmark - if currentmodelrun > len(cputhreads): - if isinstance(gpus, list): - args.gpu = gpus[(currentmodelrun - 1) - len(cputhreads)] - else: - args.gpu = gpus - del os.environ['OMP_NUM_THREADS'] - gputimes[(currentmodelrun - 1) - len(cputhreads)] = run_model(args, currentmodelrun, modelend - 1, numbermodelruns, inputfile, usernamespace) - else: - os.environ['OMP_NUM_THREADS'] = str(cputhreads[currentmodelrun - 1]) - cputimes[currentmodelrun - 1] = run_model(args, currentmodelrun, modelend - 1, numbermodelruns, inputfile, usernamespace) + os.environ['OMP_NUM_THREADS'] = str(cputhreads[currentmodelrun - 1]) + cputimes[currentmodelrun - 1] = run_model(args, currentmodelrun, modelend - 1, numbermodelruns, inputfile, usernamespace) # Get model size (in cells) and number of iterations if currentmodelrun == 1: @@ -233,7 +229,7 @@ def run_benchmark_sim(args, inputfile, usernamespace): numcells = f.attrs['nx, ny, nz'] # Save number of threads and benchmarking times to NumPy archive - np.savez(os.path.splitext(inputfile.name)[0], machineID=machineIDlong, gpuIDs=[], cputhreads=cputhreads, cputimes=cputimes, gputimes=gputimes, iterations=iterations, numcells=numcells, version=__version__) + np.savez(os.path.splitext(inputfile.name)[0], machineID=machineIDlong, gpuIDs=[], cputhreads=cputhreads, cputimes=cputimes, gputimes=[], iterations=iterations, numcells=numcells, version=__version__) simcompletestr = '\n=== Simulation completed' print('{} {}\n'.format(simcompletestr, '=' * (get_terminal_width() - 1 - len(simcompletestr)))) @@ -250,84 +246,81 @@ def run_mpi_sim(args, inputfile, usernamespace, optparams=None): """ from mpi4py import MPI - - # Define MPI message tags - tags = Enum('tags', {'READY': 0, 'DONE': 1, 'EXIT': 2, 'START': 3}) - - # Initializations and preliminaries - comm = MPI.COMM_WORLD # get MPI communicator object - size = comm.Get_size() # total number of processes - rank = comm.Get_rank() # rank of this process - status = MPI.Status() # get MPI status object - name = MPI.Get_processor_name() # get name of processor/host + + # Get name of processor/host + name = MPI.Get_processor_name() # Set range for number of models to run modelstart = args.restart if args.restart else 1 modelend = modelstart + args.n numbermodelruns = args.n - - tsimstart = perf_counter() + + # Number of workers and command line flag to indicate a spawned worker + worker = '--mpi-worker' + numberworkers = args.mpi # Master process - if rank == 0: - # Set current model run number (can use -task argument to start numbering from something other than 1) - currentmodelrun = modelstart - numworkers = size - 1 - closedworkers = 0 - print('MPI master rank {} (PID {}) on {} using {} workers'.format(rank, os.getpid(), name, numworkers)) - while closedworkers < numworkers: - data = comm.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status) - source = status.Get_source() - tag = status.Get_tag() + if worker not in sys.argv: + + tsimstart = perf_counter() + + print('MPI master rank (PID {}) on {} using {} workers'.format(os.getpid(), name, numberworkers)) + + # Create a list of work + worklist = [] + for model in range(modelstart, modelend): + workobj = dict() + workobj['currentmodelrun'] = model + if optparams: + workobj['optparams'] = optparams + worklist.append(workobj) + # Add stop sentinels + worklist += ([StopIteration] * numberworkers) - # Worker is ready, so send it a task - if tag == tags.READY.value: - if currentmodelrun < modelend: - comm.send(currentmodelrun, dest=source, tag=tags.START.value) - currentmodelrun += 1 - else: - comm.send(None, dest=source, tag=tags.EXIT.value) + # Spawn workers + comm = MPI.COMM_WORLD.Spawn(sys.executable, args=['-m', 'gprMax', '-n', str(args.n)] + sys.argv[1::] + [worker], maxprocs=numberworkers) - # Worker has completed a task - elif tag == tags.DONE.value: - pass + # Reply to whoever asks until done + status = MPI.Status() + for work in worklist: + comm.recv(source=MPI.ANY_SOURCE, status=status) + comm.send(obj=work, dest=status.Get_source()) - # Worker has completed all tasks - elif tag == tags.EXIT.value: - print('MPI worker rank {} completed all tasks'.format(source)) - closedworkers += 1 + # Shutdown + comm.Disconnect() + + tsimend = perf_counter() + simcompletestr = '\n=== Simulation completed in [HH:MM:SS]: {}'.format(datetime.timedelta(seconds=tsimend - tsimstart)) + print('{} {}\n'.format(simcompletestr, '=' * (get_terminal_width() - 1 - len(simcompletestr)))) # Worker process - else: - while True: # Break out of loop when work receives exit message - comm.send(None, dest=0, tag=tags.READY.value) - currentmodelrun = comm.recv(source=0, tag=MPI.ANY_TAG, status=status) #  Receive a model number to run from the master - tag = status.Get_tag() + elif worker in sys.argv: - # Run a model - if tag == tags.START.value: - - gpuinfo = '' - print('MPI worker rank {} (PID {}) starting model {}/{}{} on {}'.format(rank, os.getpid(), currentmodelrun, numbermodelruns, gpuinfo, name)) - - # If Taguchi optimistaion, add specific value for each parameter to optimise for each experiment to user accessible namespace - if optparams: - tmp = {} - tmp.update((key, value[currentmodelrun - 1]) for key, value in optparams.items()) - modelusernamespace = usernamespace.copy() - modelusernamespace.update({'optparams': tmp}) - else: - modelusernamespace = usernamespace + # Connect to parent + try: + comm = MPI.Comm.Get_parent() # get MPI communicator object + rank = comm.Get_rank() # rank of this process + except: + raise ValueError('Could not connect to parent') - # Run the model - run_model(args, currentmodelrun, modelend - 1, numbermodelruns, inputfile, modelusernamespace) - comm.send(None, dest=0, tag=tags.DONE.value) + # Ask for work until stop sentinel + for work in iter(lambda: comm.sendrecv(0, dest=0), StopIteration): + currentmodelrun = work['currentmodelrun'] + + gpuinfo = '' + print('MPI worker rank {} (PID {}) starting model {}/{}{} on {}'.format(rank, os.getpid(), currentmodelrun, numbermodelruns, gpuinfo, name)) + + # If Taguchi optimistaion, add specific value for each parameter to optimise for each experiment to user accessible namespace + if 'optparams' in work: + tmp = {} + tmp.update((key, value[currentmodelrun - 1]) for key, value in work['optparams'].items()) + modelusernamespace = usernamespace.copy() + modelusernamespace.update({'optparams': tmp}) + else: + modelusernamespace = usernamespace - elif tag == tags.EXIT.value: - break + # Run the model + run_model(args, currentmodelrun, modelend - 1, numbermodelruns, inputfile, modelusernamespace) - comm.send(None, dest=0, tag=tags.EXIT.value) - - tsimend = perf_counter() - simcompletestr = '\n=== Simulation completed in [HH:MM:SS]: {}'.format(datetime.timedelta(seconds=tsimend - tsimstart)) - print('{} {}\n'.format(simcompletestr, '=' * (get_terminal_width() - 1 - len(simcompletestr)))) + # Shutdown + comm.Disconnect()