Overhaul of MPI mode to spawn MPI workers from within gprMax, so doesn't need to be run with mpirun/mpiexec.

这个提交包含在:
Craig Warren
2017-03-20 16:16:12 +00:00
父节点 aadf190cfa
当前提交 988478710a

查看文件

@@ -23,22 +23,23 @@ import datetime
from enum import Enum
import os
from time import perf_counter
import sys
import h5py
import numpy as np
from gprMax._version import __version__
from gprMax._version import __version__, codename
from gprMax.constants import c, e0, m0, z0
from gprMax.exceptions import GeneralError
from gprMax.model_build_run import run_model
from gprMax.utilities import get_host_info, get_terminal_width, human_size, logo, open_path_file
from gprMax.utilities import get_host_info, get_terminal_width, human_size, logo, open_path_file, detect_gpus
def main():
"""This is the main function for gprMax."""
# Print gprMax logo, version, and licencing/copyright information
logo(__version__ + ' (Big Smoke)')
logo(__version__ + ' (' + codename + ')')
# Parse command line arguments
parser = argparse.ArgumentParser(prog='gprMax', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
@@ -46,7 +47,8 @@ def main():
parser.add_argument('-n', default=1, type=int, help='number of times to run the input file, e.g. to create a B-scan')
parser.add_argument('-task', type=int, help='task identifier (model number) for job array on Open Grid Scheduler/Grid Engine (http://gridscheduler.sourceforge.net/index.html)')
parser.add_argument('-restart', type=int, help='model number to restart from, e.g. when creating B-scan')
parser.add_argument('-mpi', action='store_true', default=False, help='flag to switch on MPI task farm')
parser.add_argument('-mpi', type=int, help='number of MPI tasks, i.e. master + workers')
parser.add_argument('--mpi-worker', action='store_true', default=False, help=argparse.SUPPRESS)
parser.add_argument('-benchmark', action='store_true', default=False, help='flag to switch on benchmarking mode')
parser.add_argument('--geometry-only', action='store_true', default=False, help='flag to only build model and produce geometry file(s)')
parser.add_argument('--geometry-fixed', action='store_true', default=False, help='flag to not reprocess model geometry, e.g. for B-scans where the geometry is fixed')
@@ -61,7 +63,7 @@ def api(inputfile, n=1, task=None, restart=None, mpi=False, benchmark=False, geo
"""If installed as a module this is the entry point."""
# Print gprMax logo, version, and licencing/copyright information
logo(__version__ + ' (Bowmore)')
logo(__version__ + ' (' + codename + ')')
class ImportArguments:
pass
@@ -111,8 +113,11 @@ def run_main(args):
# Process for simulation with Taguchi optimisation #
####################################################
elif args.opt_taguchi:
from gprMax.optimisation_taguchi import run_opt_sim
run_opt_sim(args, inputfile, usernamespace)
if args.mpi_worker: # Special case for MPI spawned workers - they do not need to enter the Taguchi optimisation mode
run_mpi_sim(args, inputfile, usernamespace)
else:
from gprMax.optimisation_taguchi import run_opt_sim
run_opt_sim(args, inputfile, usernamespace)
################################################
# Process for standard simulation (CPU or GPU) #
@@ -210,17 +215,8 @@ def run_benchmark_sim(args, inputfile, usernamespace):
usernamespace['number_model_runs'] = numbermodelruns
for currentmodelrun in range(1, modelend):
# Set args.gpu if doing GPU benchmark
if currentmodelrun > len(cputhreads):
if isinstance(gpus, list):
args.gpu = gpus[(currentmodelrun - 1) - len(cputhreads)]
else:
args.gpu = gpus
del os.environ['OMP_NUM_THREADS']
gputimes[(currentmodelrun - 1) - len(cputhreads)] = run_model(args, currentmodelrun, modelend - 1, numbermodelruns, inputfile, usernamespace)
else:
os.environ['OMP_NUM_THREADS'] = str(cputhreads[currentmodelrun - 1])
cputimes[currentmodelrun - 1] = run_model(args, currentmodelrun, modelend - 1, numbermodelruns, inputfile, usernamespace)
os.environ['OMP_NUM_THREADS'] = str(cputhreads[currentmodelrun - 1])
cputimes[currentmodelrun - 1] = run_model(args, currentmodelrun, modelend - 1, numbermodelruns, inputfile, usernamespace)
# Get model size (in cells) and number of iterations
if currentmodelrun == 1:
@@ -233,7 +229,7 @@ def run_benchmark_sim(args, inputfile, usernamespace):
numcells = f.attrs['nx, ny, nz']
# Save number of threads and benchmarking times to NumPy archive
np.savez(os.path.splitext(inputfile.name)[0], machineID=machineIDlong, gpuIDs=[], cputhreads=cputhreads, cputimes=cputimes, gputimes=gputimes, iterations=iterations, numcells=numcells, version=__version__)
np.savez(os.path.splitext(inputfile.name)[0], machineID=machineIDlong, gpuIDs=[], cputhreads=cputhreads, cputimes=cputimes, gputimes=[], iterations=iterations, numcells=numcells, version=__version__)
simcompletestr = '\n=== Simulation completed'
print('{} {}\n'.format(simcompletestr, '=' * (get_terminal_width() - 1 - len(simcompletestr))))
@@ -251,83 +247,80 @@ def run_mpi_sim(args, inputfile, usernamespace, optparams=None):
from mpi4py import MPI
# Define MPI message tags
tags = Enum('tags', {'READY': 0, 'DONE': 1, 'EXIT': 2, 'START': 3})
# Initializations and preliminaries
comm = MPI.COMM_WORLD # get MPI communicator object
size = comm.Get_size() # total number of processes
rank = comm.Get_rank() # rank of this process
status = MPI.Status() # get MPI status object
name = MPI.Get_processor_name() # get name of processor/host
# Get name of processor/host
name = MPI.Get_processor_name()
# Set range for number of models to run
modelstart = args.restart if args.restart else 1
modelend = modelstart + args.n
numbermodelruns = args.n
tsimstart = perf_counter()
# Number of workers and command line flag to indicate a spawned worker
worker = '--mpi-worker'
numberworkers = args.mpi
# Master process
if rank == 0:
# Set current model run number (can use -task argument to start numbering from something other than 1)
currentmodelrun = modelstart
numworkers = size - 1
closedworkers = 0
print('MPI master rank {} (PID {}) on {} using {} workers'.format(rank, os.getpid(), name, numworkers))
while closedworkers < numworkers:
data = comm.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)
source = status.Get_source()
tag = status.Get_tag()
if worker not in sys.argv:
# Worker is ready, so send it a task
if tag == tags.READY.value:
if currentmodelrun < modelend:
comm.send(currentmodelrun, dest=source, tag=tags.START.value)
currentmodelrun += 1
else:
comm.send(None, dest=source, tag=tags.EXIT.value)
tsimstart = perf_counter()
# Worker has completed a task
elif tag == tags.DONE.value:
pass
print('MPI master rank (PID {}) on {} using {} workers'.format(os.getpid(), name, numberworkers))
# Worker has completed all tasks
elif tag == tags.EXIT.value:
print('MPI worker rank {} completed all tasks'.format(source))
closedworkers += 1
# Create a list of work
worklist = []
for model in range(modelstart, modelend):
workobj = dict()
workobj['currentmodelrun'] = model
if optparams:
workobj['optparams'] = optparams
worklist.append(workobj)
# Add stop sentinels
worklist += ([StopIteration] * numberworkers)
# Spawn workers
comm = MPI.COMM_WORLD.Spawn(sys.executable, args=['-m', 'gprMax', '-n', str(args.n)] + sys.argv[1::] + [worker], maxprocs=numberworkers)
# Reply to whoever asks until done
status = MPI.Status()
for work in worklist:
comm.recv(source=MPI.ANY_SOURCE, status=status)
comm.send(obj=work, dest=status.Get_source())
# Shutdown
comm.Disconnect()
tsimend = perf_counter()
simcompletestr = '\n=== Simulation completed in [HH:MM:SS]: {}'.format(datetime.timedelta(seconds=tsimend - tsimstart))
print('{} {}\n'.format(simcompletestr, '=' * (get_terminal_width() - 1 - len(simcompletestr))))
# Worker process
else:
while True: # Break out of loop when work receives exit message
comm.send(None, dest=0, tag=tags.READY.value)
currentmodelrun = comm.recv(source=0, tag=MPI.ANY_TAG, status=status) #  Receive a model number to run from the master
tag = status.Get_tag()
elif worker in sys.argv:
# Run a model
if tag == tags.START.value:
# Connect to parent
try:
comm = MPI.Comm.Get_parent() # get MPI communicator object
rank = comm.Get_rank() # rank of this process
except:
raise ValueError('Could not connect to parent')
gpuinfo = ''
print('MPI worker rank {} (PID {}) starting model {}/{}{} on {}'.format(rank, os.getpid(), currentmodelrun, numbermodelruns, gpuinfo, name))
# Ask for work until stop sentinel
for work in iter(lambda: comm.sendrecv(0, dest=0), StopIteration):
currentmodelrun = work['currentmodelrun']
# If Taguchi optimistaion, add specific value for each parameter to optimise for each experiment to user accessible namespace
if optparams:
tmp = {}
tmp.update((key, value[currentmodelrun - 1]) for key, value in optparams.items())
modelusernamespace = usernamespace.copy()
modelusernamespace.update({'optparams': tmp})
else:
modelusernamespace = usernamespace
gpuinfo = ''
print('MPI worker rank {} (PID {}) starting model {}/{}{} on {}'.format(rank, os.getpid(), currentmodelrun, numbermodelruns, gpuinfo, name))
# Run the model
run_model(args, currentmodelrun, modelend - 1, numbermodelruns, inputfile, modelusernamespace)
comm.send(None, dest=0, tag=tags.DONE.value)
# If Taguchi optimistaion, add specific value for each parameter to optimise for each experiment to user accessible namespace
if 'optparams' in work:
tmp = {}
tmp.update((key, value[currentmodelrun - 1]) for key, value in work['optparams'].items())
modelusernamespace = usernamespace.copy()
modelusernamespace.update({'optparams': tmp})
else:
modelusernamespace = usernamespace
elif tag == tags.EXIT.value:
break
# Run the model
run_model(args, currentmodelrun, modelend - 1, numbermodelruns, inputfile, modelusernamespace)
comm.send(None, dest=0, tag=tags.EXIT.value)
tsimend = perf_counter()
simcompletestr = '\n=== Simulation completed in [HH:MM:SS]: {}'.format(datetime.timedelta(seconds=tsimend - tsimstart))
print('{} {}\n'.format(simcompletestr, '=' * (get_terminal_width() - 1 - len(simcompletestr))))
# Shutdown
comm.Disconnect()